diff --git a/src/primingprob/cli_priming_prob.py b/src/primingprob/cli_priming_prob.py new file mode 100644 index 0000000000000000000000000000000000000000..0a9e0e76432b8350ea4529acc40a4bbd59177c38 --- /dev/null +++ b/src/primingprob/cli_priming_prob.py @@ -0,0 +1,63 @@ +"""Command-line interface client.""" +import argparse +import os +from pathlib import Path +import sys +from src.primingprob.priming_prob import Probability as Pbt + + +def parse_args(): + """Parse CLI arguments. + + Returns: + Parsed CLI arguments. + """ + parser = argparse.ArgumentParser( + description="RIblast input", + add_help=False, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + 'input_file', + type=lambda p: Path(p).absolute(), + metavar="PATH", + help="path to energy-file", + ) + + parser.add_argument( + 'fasta_file', + type=lambda p: Path(p).absolute(), + metavar="PATH", + help="path to fasta-file", + ) + + parser.add_argument( + 'output_file', + type=lambda p: Path(p).absolute(), + metavar="PATH", + help="path to output-file", + ) + + return parser.parse_args() + + +def main(): + """Start priming_prob.py.""" + args = parse_args() + + if os.path.exists(args.input_file) and os.path.exists(args.fasta_file): # pragma: no cover + paradata = Pbt.inter_para(args.input_file) + Pbt.inter_prob(paradata, args.fasta_file, args.output_file) + + if not os.path.exists(args.input_file) and os.path.exists(args.fasta_file): # pragma: no cover + sys.exit("Path to input-file does not exist") + + +def init(): + """Entry point for CLI executable.""" + if __name__ == '__main__': + main() + + +init() diff --git a/src/primingprob/priming_prob.py b/src/primingprob/priming_prob.py new file mode 100644 index 0000000000000000000000000000000000000000..352e43b2cf392ed97725f0a9f0e5b4e93ca4c210 --- /dev/null +++ b/src/primingprob/priming_prob.py @@ -0,0 +1,154 @@ +"""Imports.""" +import numpy as np +import scipy.constants + + +class Probability: + """Calculates the probability of priming and write the gff file.""" + + def inter_para(input_path): + """Open the RIblast output file and read only the parameter lines. + + Args: + Path to Energies.txt file + + Returns: + my_list (list): Contains all the paramter lines from Energies.txt + """ + Energies = open(input_path, "r") # ouput of RIblast + + mylist = [] # all lines of Energies starting with an ID-number + + for myline in Energies: # Read lines containing needed data + if myline[0].isdigit(): + mylist.append(myline) + elif myline[0].isdigit() is False: + continue + + Energies.close() + + return(mylist) + + def inter_prob(data_list, fasta_path, output_path): + """Calculate the prob. and write the gff file. + + Args: + data_list (list): Contains all parameters of Energies.txt + fasta_path (path): Path to fasta file + output_path (path): Path to output file + + Returns: + gff (file): Gff file contains all the output information + """ + # count interactions per script through fasta-ID + fastafile = open(fasta_path, "r") + id_list = [] # contains list of transcipt IDs + + for mylinecounter in fastafile: + if mylinecounter.startswith(">"): + a = mylinecounter + a = mylinecounter.replace(">", "") + b = a.split() + c = b[0] + id_list.append(c) + elif mylinecounter.startswith(">") is False: + continue + + counter = 0 + counter_list = [] # list of number of interactions per transcript + + for cc in range(0, len(id_list)): + for dd in range(0, len(data_list)): + if id_list[cc] in data_list[dd]: + counter = counter + 1 + + counter_list.append(counter) + counter = 0 + + para_list = [] + + for i in range(0, len(data_list)): + x = data_list[i].split(",") + para_list.append(x) + # splitting each list item by the "," this results in a 2D list + + for j in range(0, len(para_list)): + del para_list[j][1:-2] + # only keeps the ID-numer, the interaction + # energy, and interaction site of both sequences. (still a 2D-list) + + start_end_index = 1 # index of start and end in the list + + for d in range(0, len(para_list)): # Optimize location output + a = para_list[d][2].split(":") + a[start_end_index] = a[start_end_index].replace(") ", "") + a[start_end_index] = a[start_end_index].replace("\n", "") + a[start_end_index] = a[start_end_index].replace("-", " ") + a[start_end_index] = a[start_end_index].split(" ") + para_list[d][2] = a[start_end_index] + + for k in range(0, len(para_list)): # type-conversion of ID and E + for w in range(0, 2): # "2" because the first two elements in each sublist are ID and E + para_list[k][w] = float(para_list[k][w]) + + joule = 4184 # 1kcal = 4184 joule + inter_energy = 1 # index of inter. energy in list + + for z in range(0, len(para_list)): # from kcal/mol to Joule/mol + para_list[z][inter_energy] = para_list[z][inter_energy] * joule + + T = 300.15 # Roomtemperature (27 degree celsius) in Kelvin + kT = scipy.constants.R * T # calculating gas constant R * T + + for u in range(0, len(para_list)): # calculating -E / RT + para_list[u][inter_energy] = (-(para_list[u][inter_energy])/kT) + + prob_list = [] # List containing all the prob. + + for h in range(0, len(para_list)): # calculating the e^(-E/kT) + probab = np.exp(para_list[h][inter_energy]) + prob_list.append(probab) + para_list[h][inter_energy] = probab + + probability_sum = 0 # variable to calculate sum of probabilities per transcipt + sum_list = [] # List containing all the sums + + prob_list2 = prob_list.copy() + + for jj in range(0, len(counter_list)): + for ii in range(0, counter_list[jj]): + probability_sum = probability_sum + prob_list[ii] + sum_list.append(probability_sum) + probability_sum = 0 # setting back to 0, for next interactions + del prob_list[0:counter_list[jj]] + + real_prob = [] # contains all the normalized probabilities + + for jj in range(0, len(sum_list)): + for ii in range(0, counter_list[jj]): + prob_list2[ii] = prob_list2[ii]/sum_list[jj] + real_prob.append(prob_list2[ii]) + del prob_list2[0:counter_list[jj]] # Normalized probabilities + + for vv in range(0, len(para_list)): # inserting the normalized values in para_list + para_list[vv][1] = real_prob[vv] + + final_list = [] # List containing all the final paramters to print + + for bb in range(0, len(sum_list)): # Insert ID in paralist + for ss in range(0, counter_list[bb]): + para_list[ss][0] = id_list[bb] + final_list.append(para_list[ss]) + del para_list[0:counter_list[bb]] + + gff = open(output_path, "w+") # gff file + + for ll in range(0, len(final_list)): # writing gff file + gff.write(str(final_list[ll][0]) + # transcript ID + "\tRIblast\tPriming Site\t" + + str(final_list[ll][2][1])+"\t" + # start + str(final_list[ll][2][0])+"\t" + # end + str(final_list[ll][1])+"\t.\t.\t.\n") # probability + + gff.close + return final_list