From e1f9cf862ab4a298d524ec63adf9aca082c29ea7 Mon Sep 17 00:00:00 2001 From: LauraU123 <laura.urbanska@stud.unibas.ch> Date: Wed, 9 Nov 2022 10:31:37 +0100 Subject: [PATCH] added trancript extractor script --- scripts/representativeTranscript.py | 84 -------- scripts/transkript_extractor.py | 287 ++++++++++++++++++++++++++++ 2 files changed, 287 insertions(+), 84 deletions(-) delete mode 100644 scripts/representativeTranscript.py create mode 100644 scripts/transkript_extractor.py diff --git a/scripts/representativeTranscript.py b/scripts/representativeTranscript.py deleted file mode 100644 index f5649f2..0000000 --- a/scripts/representativeTranscript.py +++ /dev/null @@ -1,84 +0,0 @@ - -import pandas as pd -import re - -''' -This part of the code find a representative transcript for each ID given in the csv input file. -It works with the output of the function "transcript selector". -It return a csv file containing the names of the representative transcript and their levels. - -''' - -#import csv file [ID, Expression_level] and create a df -def import_csv_to_df(csv_file) : - df_csv = pd.read_csv(csv_file, names=["ID", "Expression_level"]) - return df_csv - - -#import modified gtf file and create a df -def import_gtfSelection_to_df(gtf_modified_file): - - #create a df from the tab separated file input - df_input =pd.read_csv(gtf_modified_file, sep='\t', lineterminator='\n', -names =["Gene_mixed", "Transcript", "Support_level", "Length", "Line_index", "Na"] ) - - #Create a new column with only gene name from Gene_mixed column - df_input["Gene"] = df_input["Gene_mixed"].str.extract('([A-Z]\w{0,})', expand=True) - - #Create a new column with only transcript number from Gene_mixed column - df_input["Transcript_number"] = df_input["Gene_mixed"].str.extract('(^\d)', expand=True) - - #Create a new df with relevant column and without NA - df_clean = df_input.loc[:, ["Gene","Transcript_number","Transcript","Support_level", "Length", "Line_index"]] - df_clean["Gene"] = df_clean["Gene"].fillna(method='ffill') - df_clean = df_clean.dropna(axis=0) - return df_clean - - - -# Return a df containing representative transcripts and their expression level from genes mentioned in the csv file - -def representative_from_gene(df_gtfSelection, df_csv): - #check wich genes are shared by df_gtfSelection and df_csv and create a reduced df - df_shared = df_gtfSelection[(df_gtfSelection["Gene"].isin(df_csv["ID"]))]# /!\ return an empty list if I do the same with "Transcript" column. - - #pick only the transcripts with the highest support level (best is = 1 ) - df_filtered = df_shared[df_shared["Support_level"]==df_shared["Support_level"].min()] - - #pick the transcript with the greatest length if multiple transcript have the same support level - df_filtered2 = df_filtered.sort_index().groupby(df_filtered["Gene"]).max() - - #combine expression level with transcript with highest support level and greatest length - df_filtered2["Expression_level"] = df_filtered2.Gene.map(df_csv.set_index("ID")['Expression_level']) - - #create a reduced df with only the representative transcript and its support level - df_final = df_filtered2[["Transcript","Expression_level"]] - print("This is your representative transcripts : \n \n {}".format(df_final)) - - return df_final - - -#Missing conditions : - # If multiple transcript form the same gene, add their expression - # If given transcript name, return the most representative transcript - - - -# Output csv file containing representative transcript levels and id -def write_csv_Transcript_Expression_level(df_representative): - df_final = df_representative[["Transcript","Expression_level"]] - with open("transcripts.csv", "w") as fileout : - fileout.write(df_representative.to_csv(columns=["Transcript","Expression_level"], index=False)) - -### put your inputs here ! ### - -ID_levels_csv = "scRNA\input.csv" # put your csv file here -gtf_file = "scRNA/test_inter_mediat_file.txt" # put your gtf selected file here - -df_gtf = import_gtfSelection_to_df(gtf_file) -df_csv_input = import_csv_to_df(ID_levels_csv) - -representative_transcript = representative_from_gene(df_gtf, df_csv_input) - -write_csv_Transcript_Expression_level(representative_transcript) # create a csv file named "transcript.csv" - diff --git a/scripts/transkript_extractor.py b/scripts/transkript_extractor.py new file mode 100644 index 0000000..9f69a0d --- /dev/null +++ b/scripts/transkript_extractor.py @@ -0,0 +1,287 @@ +#### Transcript extractor ##### + +### Called Packages ### +import re +import os + +### Functions ### + +def __searche_for_preexisting_files(file_name,deposit_pathway_name = os.getcwd()): + """This function searches for preexisting files of the same name as the results file of the current program. It allows the user to choose to move on with the pre-existing file """ + File_of_same_name_found = False + generat_new_file = False + directory_content = os.listdir(deposit_pathway_name) + for file in directory_content: + Search_profile = file_name+"_intermediat_file.txt" + if file == Search_profile: + while True: + File_found_input = input ("An intermediate file has allready been generated from this file\nDo you want to generate a new one [y/n] \n>") + if File_found_input == "n": + File_of_same_name_found = True + break + elif File_found_input == "y": + generat_new_file = True + break + else: + print("Sorry this was not a valid input\nPlease press [y] if you want to generate a new file or [n] if you want to use the preexisting file") + break + else: + continue + if File_of_same_name_found: + print("No new file will be generated, the program can continue") + elif generat_new_file: + print("A new file will be generated please wait...\n") + else: + print("No pre-existing intermediate file based on the currend file have been found.\nA new file will be generated please wait...\n") + return(File_of_same_name_found) + +def __test_file_name(file_name,source_pathway_name = os.getcwd()): + """This function validates that the source file exists at the source path. It turns the file name input in a standardized format that can be used in the next steps""" + + directory_content = os.listdir(source_pathway_name) + + index_of_the_dot = file_name.rfind(".") + valide_source_file = False + validate_source_file = True + if index_of_the_dot ==-1: + file_name += ".gtf" + else: + source_file_typ = file_name[index_of_the_dot:] + not_a_file_type = re.compile(".\d{1,13}") + try_not_a_file_type = not_a_file_type.search(source_file_typ) + if source_file_typ == ".gtf": + file_name = file_name + elif try_not_a_file_type: + file_name += ".gtf" + else: + print("This program can not handle",source_file_typ,"files. \nplease use a .gtf file" ) + validate_source_file = False + #The block above tests if the file_name includes the file type and if no + #file type is found adds ".gtf" und if a non ".gtf" file is found gives an error + + if validate_source_file: + for file in directory_content: + if file == file_name: + valide_source_file = True + break + #The block above tests if a file on the given name is in the given directora + + if valide_source_file: + print("The file:",file_name,"has been found.\n") + else: + print("No .gtf file of the name",file_name,"has been found in this pathway") + #The bock above gives feed back regarding the results of the file test + + file_name = file_name.replace(".gtf","") + #This line normalizes the file name + return(valide_source_file,file_name) + +def __do_pathways_exist__(source_pathway_name,deposit_pathway_name): + """This funtion tests that the entered pathways actualy exist""" + does_source_pathway_exist = os.path.exists(source_pathway_name) + does_deposit_pathway_exist = os.path.exists(deposit_pathway_name) + #The Block above does the actual testing + if does_source_pathway_exist: + source_pathway_name = source_pathway_name + else: + print("The source pathway:",source_pathway_name,"has not been found\nThe source pathway was set to the default") + source_pathway_name = os.getcwd() + #The block above detail the possible reactions for the source pathe existing or not existing + if does_deposit_pathway_exist: + deposit_pathway_name = deposit_pathway_name + else: + print("The deposit pathway:",deposit_pathway_name,"has not been found\nThe deposit pathway was set to the default") + deposit_pathway_name = source_pathway_name + #The block above details the possible reactions for the deposit pathway existing or not existing + return(source_pathway_name,deposit_pathway_name) + +def gene_ID_finder(entry): + """This function is supposed to finde the gen ID of a known gen entry""" + index_gene_id = entry.find("gene_id") + #This line determines where the transcript ID is + find_gene_id_name = re.compile("\"\S{1,25}\"") + #This line defines the pattern of a transcript ID + sub_entry = entry[index_gene_id:] + #This line generates a subentra starting at the locatind of the tanscript ID + try_find_gene_id_name = find_gene_id_name.search(sub_entry) + gene_ID = try_find_gene_id_name[0].replace("\"","") + #The block above findes the transcript ID and changes it into a usable format + return (gene_ID) + +def transcript_ID_finder (entry): + """This function is supposed to finde the transcript ID in a known transcript entry""" + index_transcript_id = entry.find("transcript_id") + #This line determines where the transcript ID is + find_transcript_id_name = re.compile("\"\S{1,25}\"") + #This line defines the pattern of a transcript ID + sub_entry = entry[index_transcript_id:] + #This line generates a subentra starting at the locatind of the tanscript ID + try_find_transcript_id_name = find_transcript_id_name.search(sub_entry) + + try: + transcript_ID = try_find_transcript_id_name[0].replace("\"","") + #The block above findes the transcript ID and changes it into a usable format + except: + transcript_ID = "" + return (transcript_ID) + +def transcript_support_level_finder(entry): + """This function is supposed to find the transcript support level in a known transcript entry""" + transcript_support_level_start_ID = entry.find("transcript_support_level") + sub_entry = entry[transcript_support_level_start_ID:] + + try: + #the try and except clauses are there for the case that there is + #no actual score given + score_fidner = re.compile("\W\w{1,16}\W{2}") + try_score_finder = score_fidner.search(sub_entry) + Pre_score_1 = try_score_finder[0] + Pre_score_2 = Pre_score_1.replace("\"","") + Pre_score_2 = Pre_score_2.replace("(","") + transcript_support_level = Pre_score_2.replace(";","") + if transcript_support_level == "NA": + transcript_support_level = 100 + + except: + #will give NA if no numerical score is found + transcript_support_level = 100 + return (transcript_support_level) + + + + +def _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name): + """This funtion extracts the transcript number ,transcript ID, the transcript support level, the transcrip length and the line index from a gtf file of a given name and saves tham as a new file name given_name_intermediat_file.txt. It only works in the directory of the skript at this point""" + with open(source_pathway_name+"\\"+file_name+".gtf", 'r') as f: + #this line opens the file in question + + Old_gen_ID = str() + #stand-in as the first couple entrys are not genes + with open(deposit_pathway_name+"\\"+file_name+"_"+"intermediat_file"+".txt","w") as IMF: + #creates a new file that will have the same name as the source file + #but with an added _inter_mediat_file + transcript_number = 0 + for entry in f: + #this loop reads all lines in the source file one by one + Gen_finder = re.compile("gene_name") + try_gen_finder = Gen_finder.search(entry) + #the lines above determin if the is a "gene_name" collumn + #in the current entry + if (try_gen_finder): + Gen_ID = gene_ID_finder(entry) + else: + Gen_ID = Old_gen_ID + #determins the gene_name using the function showen futher up + if Gen_ID != Old_gen_ID: + #chachs if the gene is new or if it is an entry belonging + #to the previous gen e.g. a transkript, exon ... + Gen_entry = ">"+str(Gen_ID)+"\n" + IMF.write(Gen_entry) + #if the Gen is new a new line is inserted in the intermediat + #file to seperate the transcripts belonging to the different genes + transcript_number = 0 + #the transcript number indicates how many transcripts have + #been found of the same gene befor the current one it starts + #at 1 for each new gene + Old_gen_ID = Gen_ID + #the new_name is set as the Old meaning current name + + transkript_finder = re.compile("\ttranscript\t") + try_transkript_finder = transkript_finder.search(entry) + #the lines above serve to identify if the current entry blonges + #to a transcript + + if try_transkript_finder: + #this confition is activated if the entry belongs to a transcript + transcript_number += 1 + #the transcript number is updated (starting at on given that t + #he default is 0) + Transcript_ID = transcript_ID_finder (entry) + #the function that determins the transcript ID is called + transcript_support_level = transcript_support_level_finder(entry) + #the function that determins the transcript support level is called + New_entry = str(transcript_number)+"\t"+str(Transcript_ID)+"\t"+str(transcript_support_level)+"\t"+"\t\n" + IMF.write(New_entry) + #this lines assemble the transcript number ,transcript ID + #and the transcript support level into a new line for the + #intermediat file and adds them in + print("The transcripts have been collected") + +def __parameter_editor(file_name,source_pathway_name,deposit_pathway_name): + """This funtion allows for chaging the parameters after running the program""" + while True: + #This is the main loot wich allows show the current partameters and + #allows the usert to shouse if the want to use edit the paremeters or + #to continue with the given parameters + print("The program will run with the following parameters:\nFile name:\t\t",file_name,"\nSource pathway:\t",source_pathway_name,"\nDeposit pathway:\t",deposit_pathway_name,"\n") + parameter_conformation = input("To continue with these parameters input [continue or c] to change them input [edit]\n>") + #The block above incluts the "table" whith the current parameters as well as the input prompt + if parameter_conformation == "continue"or parameter_conformation =="c": + #This if clause will end the main loop and the funtion + break + elif parameter_conformation == "edit": + #This conditin lead into the secundary loop which allows the user to edit the parameters + while True: + change_question = input("select the parameter you want to change [nfile/spath/dpath] or input [b] to go back\n>") + #This is the input prompt for the secundary loop + if change_question == "nfile": + #This condition allows the user to chenge the file name + file_name = input("Pleas input the new file name\n>") + break + elif change_question == "spath": + #This condition allows the user to change the source path + source_pathway_name = input("Pleas input the new source path\n>") + + does_source_pathway_exist = os.path.exists(source_pathway_name) + if does_source_pathway_exist: + break + else: + print("The new source pathway:",source_pathway_name,"does not existe\nThe source pathway was returnt to default:",os.getcwd()) + source_pathway_name = os.getcwd() + #The block above test that if the new source pathway is valid + elif change_question == "dpath": + #This condition allows the user to change the deposit path + deposit_pathway_name = input("Pleas input the new deposit path name\n>") + + does_deposit_pathway_exist = os.path.exists(deposit_pathway_name) + if does_deposit_pathway_exist: + break + else: + print("The new deposit pathway:",deposit_pathway_name,"does not existe\nThe deposit pathway was returnt to default:",source_pathway_name) + deposit_pathway_name = source_pathway_name + #The block above test if the new deposit pathway is valid + elif change_question == "b": + # This condition allows the user to return to the main loop + break + else: + #This condition covers all non valid inputs into the secund loop + print("The input",change_question,"is not valide please use one of the specifyed comands") + + else: + #This condition covers all non valid input for the main loop + print("The input",parameter_conformation,"is not valide please use one of the specifyed comands\n") + return(file_name,source_pathway_name,deposit_pathway_name) + +def extract_transkript (file_name = "test",source_pathway_name = os.getcwd(),deposit_pathway_name = False): + """This it the overall exetutable funtion that will execute the transcript extraction process for a given file with all checks. + The default file name is "test". This function will also return the file name, the source pathway and the depisti pathway that have been used to generate the intermediat file""" + if deposit_pathway_name == False: + deposit_pathway_name = source_pathway_name + file_name,source_pathway_name,deposit_pathway_name = __parameter_editor(file_name,source_pathway_name,deposit_pathway_name) + source_pathway_name,deposit_pathway_name =__do_pathways_exist__(source_pathway_name,deposit_pathway_name) + validated_file_name = __test_file_name(file_name,source_pathway_name) + file_name = validated_file_name[1] + if validated_file_name[0]: + if __searche_for_preexisting_files(file_name,deposit_pathway_name): + print("The transcripts has been collected\n") + else: + _transcript_extractor (file_name,source_pathway_name,deposit_pathway_name) + return(file_name,source_pathway_name,deposit_pathway_name) + +#### Dev part #### + +if __name__ == "__main__": + extract_transkript() +#This line allows the file to be executed on its own also from + + -- GitLab