diff --git a/src/PrimingProb.py b/src/PrimingProb.py deleted file mode 100644 index bb755fc731b4746c76517a34a544e5f566daeaf6..0000000000000000000000000000000000000000 --- a/src/PrimingProb.py +++ /dev/null @@ -1,101 +0,0 @@ -"""Imports.""" -import numpy as np -import scipy.constants - - -class Probability: - """Calculates the probability of priming and write the gff file.""" - - def InterPara(): - """Open the RIblast output file and read only the parameter lines. - - Args: - None - - Returns: - my_list (list): Contains all the paramter lines from RIblast - """ - myfile = open("./Docker-Files/Energies.txt", "r") # ouput of RIblast - - mylist = [] # all lines of Energies starting with an ID-number - - for myline in myfile: # Read lines containing needed data - if myline[0].isdigit(): - mylist.append(myline) - else: - continue - myfile.close() - - return(mylist) - - data = InterPara() - - def InterProb(data_list): - """Calculate the prob. and make the gff file. - - Args: - data_list (list): Contains all parameters of RIblast - - Returns: - gff (file): Gff file contains all the output information - """ - para_list = [] - - for i in range(0, len(data_list)): - x = data_list[i].split(",") - para_list.append(x) - # splitting each list item by the "," this results in a 2-D list - - for j in range(0, len(para_list)): - del para_list[j][1:-2] - # only keeps the ID-numer, the interaction - # energy, and interaction site of both sequences. (still a 2D-list) - - for d in range(0, len(para_list)): # Optimize location output - a = para_list[d][2].split(":") - a[1] = a[1].replace(") ", "") - a[1] = a[1].replace("\n", "") - a[1] = a[1].replace("-", " ") - a[1] = a[1].split(" ") - para_list[d][2] = a[1] - - for k in range(0, len(para_list)): # type-conversion of ID and E - for l in range(0, 2): - para_list[k][l] = float(para_list[k][l]) - - for z in range(0, len(para_list)): # from kcal/mol to Joule/mol - para_list[z][1] = para_list[z][1] * 4184 - - kT = scipy.constants.R * 300.15 # calculating gas constant R * T - - for u in range(0, len(para_list)): # calculating -E / RT - para_list[u][1] = (-(para_list[u][1])/kT) - - prob_list = [] # List containing all the prob. - - for h in range(0, len(para_list)): # calculating the e^(-E/kT) - probab = np.exp(para_list[h][1]) - prob_list.append(probab) - para_list[h][1] = probab - - prob_list_sum = sum(prob_list) # Sum of all probabilities - real_prob = [] - - for v in range(0, len(para_list)): # Normalized probabilities - prob_linear = (para_list[v][1])/prob_list_sum - real_prob.append(prob_linear) - para_list[v][1] = prob_linear - - gff = open("./output/Potential_Priming_sites.txt", "w+") # gff file - - for i in range(0, len(para_list)): - gff.write("Interactions: "+str(para_list[i][0]) + - "\tRIblast\ttranscript\t"+str(para_list[i][2][1])+"\t" + - str(para_list[i][2][0])+"\t" + - str(para_list[i][1])+"\t.\t.\t.\n") - - gff.close - - return gff - - InterProb(data) diff --git a/src/PrimingProb_Final.py b/src/PrimingProb_Final.py new file mode 100644 index 0000000000000000000000000000000000000000..a1adb8cd7d41561c29812cae8c13addb21805052 --- /dev/null +++ b/src/PrimingProb_Final.py @@ -0,0 +1,173 @@ +"""Imports.""" +import numpy as np +import scipy.constants +import argparse +from pathlib import Path + + +class Probability: + """Calculates the probability of priming and write the gff file.""" + + # adding parser + parser = argparse.ArgumentParser( + description="Fasta-file input", + add_help=False, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + # add arguments + parser.add_argument( + 'input_file', + type=lambda p: Path(p).absolute(), + metavar="PATH", + help="path to fasta-file", + ) + + args = parser.parse_args() + + def InterPara(path): + """Open the RIblast output file and read only the parameter lines. + + Args: + Path to Fasta-file + + Returns: + my_list (list): Contains all the paramter lines from RIblast + """ + # myfile = open(sys.argv[1], "r") # ouput of RIblast + myfile = open(path, "r") + + mylist = [] # all lines of Energies starting with an ID-number + + for myline in myfile: # Read lines containing needed data + if myline[0].isdigit(): + mylist.append(myline) + else: + continue + + myfile.close() + + return(mylist) + + data = InterPara(args.input_file) + + def InterProb(data_list): + """Calculate the prob. and make the gff file. + + Args: + data_list (list): Contains all parameters of RIblast + + Returns: + gff (file): Gff file contains all the output information + """ + # count interactions per script through fasta ID (first line of fasta) + mycounter = open("./Docker-Files/transcript.fasta", "r") + + mycounter_list = [] + + for mylinecounter in mycounter: + if mylinecounter.startswith(">"): + a = mylinecounter + a = mylinecounter.replace(">", "") + b = a.replace("\n", "") + mycounter_list.append(b) + else: + continue + + counter = 0 + counter_list = [] + + for cc in range(0, len(mycounter_list)): + for dd in range(0, len(data_list)): + if mycounter_list[cc] in data_list[dd]: + counter = counter + 1 + else: + continue + counter_list.append(counter) + counter = 0 + + para_list = [] + + for i in range(0, len(data_list)): + x = data_list[i].split(",") + para_list.append(x) + # splitting each list item by the "," this results in a 2-D list + + for j in range(0, len(para_list)): + del para_list[j][1:-2] + # only keeps the ID-numer, the interaction + # energy, and interaction site of both sequences. (still a 2D-list) + + for d in range(0, len(para_list)): # Optimize location output + a = para_list[d][2].split(":") + a[1] = a[1].replace(") ", "") + a[1] = a[1].replace("\n", "") + a[1] = a[1].replace("-", " ") + a[1] = a[1].split(" ") + para_list[d][2] = a[1] + + for k in range(0, len(para_list)): # type-conversion of ID and E + for w in range(0, 2): + para_list[k][w] = float(para_list[k][w]) + + for z in range(0, len(para_list)): # from kcal/mol to Joule/mol + para_list[z][1] = para_list[z][1] * 4184 + + kT = scipy.constants.R * 300.15 # calculating gas constant R * T + + for u in range(0, len(para_list)): # calculating -E / RT + para_list[u][1] = (-(para_list[u][1])/kT) + + prob_list = [] # List containing all the prob. + + for h in range(0, len(para_list)): # calculating the e^(-E/kT) + probab = np.exp(para_list[h][1]) + prob_list.append(probab) + para_list[h][1] = probab + + count_sum = 0 + sum_list = [] + + prob_list2 = prob_list.copy() + + for jj in range(0, len(counter_list)): + for ii in range(0, counter_list[jj]): + count_sum = count_sum + prob_list[ii] + sum_list.append(count_sum) + count_sum = 0 + del prob_list[0:counter_list[jj]] + + real_prob = [] + + for jj in range(0, len(sum_list)): + for ii in range(0, counter_list[jj]): + prob_list2[ii] = prob_list2[ii]/sum_list[jj] + real_prob.append(prob_list2[ii]) + del prob_list2[0:counter_list[jj]] # Normalized probabilities + + # real_prob contains all the linearized probabilities + + for vv in range(0, len(para_list)): + para_list[vv][1] = real_prob[vv] + + final_list = [] + + for bb in range(0, len(sum_list)): # Insert ID in paralist + for ss in range(0, counter_list[bb]): + para_list[ss][0] = mycounter_list[bb] + final_list.append(para_list[ss]) + del para_list[0:counter_list[bb]] + + gff = open("./output/Potential_Priming_sites.txt", "w+") # gff file + + for ll in range(0, len(final_list)): + gff.write(str(final_list[ll][0]) + + "\tRIblast\ttranscript\t" + + str(final_list[ll][2][1])+"\t" + + str(final_list[ll][2][0])+"\t" + + str(final_list[ll][1])+"\t.\t.\t.\n") + + gff.close + + return gff + + InterProb(data) diff --git a/src/RIblast_process2.nf b/src/RIblast_process3.nf similarity index 93% rename from src/RIblast_process2.nf rename to src/RIblast_process3.nf index 66f1017b11f5adc3bb946cbcdb3aa1b5d5bfb35b..eb35b4cdb1fdf4d208af6f5fa8dc694bb532c274 100644 --- a/src/RIblast_process2.nf +++ b/src/RIblast_process3.nf @@ -1,6 +1,6 @@ #!/usr/bin/env nextflow -params.transcripts = "$baseDir/Docker-Files/transcript3.fasta" +params.transcripts = "$baseDir/Docker-Files/transcript2.fasta" params.primers = "$baseDir/Docker-Files/primer.fasta" log.info """\ @@ -23,3 +23,4 @@ process RIblast_interaction { RIblast ris -i $primer -o /RIblast/Energies.txt -d /RIblast/test_db """ } + diff --git a/src/parameter_parser.py b/src/parameter_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..61af08e707a9cf3485432b50007ea516dc8f15ae --- /dev/null +++ b/src/parameter_parser.py @@ -0,0 +1,72 @@ +"""Module containing functionalities to store run parameters. + +Class: + ParamParse: Take as input a file containing the parameters + and stores them in its attributes. +""" +import logging +from pathlib import Path + +LOG = logging.getLogger(__name__) + + +class ParamParse: + """Class holding the parameters of the run. + + Args: + param_file: Path to file with parameter values. + + Attributes: + param_file: File with parameter values. + transcripts_file: File with transcript abundances. + genome_ref_file: Reference genome file. + annotations_file: Transcripts annotations. + output_path: Output folder. + n_reads: Number of reads to be simulated. + n_cells: Number of cells to be simulated. + rna_avg_length: average RNA fragment length. + rna_sd_length: RNA fragment length standard deviation. + read_length: Read length. + intron_rate: Constant probability of retaining an intron. + add_poly_a: Boolean option to add a poly A tail. + poly_a_func: Function to add a poly_a tail. + primer_seq: Sequence of the primer. + priming_func: Function that evaluates internal priming. + """ + + def __init__(self, param_file: Path) -> None: + """Class constructor.""" + self.param_file: Path = Path(param_file) + with open(param_file) as f: + LOG.info("Loading parameters...") + for line in f: + s = line.split(':') + if s[0] == 'Csv transcripts file': + self.transcripts_file: Path = Path(s[1].strip()) + elif s[0] == 'Reference genome file': + self.genome_ref_file: Path = Path(s[1].strip()) + elif s[0] == 'Transcripts annotation file': + self.annotations_file: Path = Path(s[1].strip()) + elif s[0] == 'Output folder': + self.output_path: Path = Path(s[1].strip()) + elif s[0] == 'Number of reads': + self.n_reads: int = int(s[1].strip()) + elif s[0] == 'Number of cells': + self.n_cells: int = int(s[1].strip()) + elif s[0] == 'Average RNA fragments length': + self.rna_avg: float = float(s[1].strip()) + elif s[0] == 'RNA fragment length standard deviation': + self.rna_sd_length: float = float(s[1].strip()) + elif s[0] == 'Reads length': + self.read_length: int = int(s[1].strip()) + elif s[0] == 'Intron retaining probability': + self.intron_rate: float = float(s[1].strip()) + elif s[0] == 'Add poly A tail': + self.add_poly_a: bool = bool(s[1].strip()) + elif s[0] == 'Function to add poly A tail': + self.poly_a_func: str = str(s[1].strip()) + elif s[0] == 'Primer sequence': + self.primer_seq: str = str(s[1].strip()) + elif s[0] == 'Function to evaluate internal priming': + self.priming_func: str = str(s[1].strip()) + LOG.info("Parameters loaded.") diff --git a/tests/resources/Param_test.txt b/tests/resources/Param_test.txt new file mode 100644 index 0000000000000000000000000000000000000000..6af6df2c32ab69c6a3d776a5cc0909a890bea1a0 --- /dev/null +++ b/tests/resources/Param_test.txt @@ -0,0 +1,27 @@ +Csv transcripts file: ./transcripts.csv + +Reference genome file: ./home/ref.ref + +Transcripts annotation file: ./home/annotations.ann + +Output folder: ./home/output + +Number of reads: 10023 + +Number of cells: 34 + +Average RNA fragments length: 150 + +RNA fragment length standard deviation: 10 + +Reads length: 100 + +Intron retaining probability: 0.2 + +Add poly A tail: TRUE + +Function to add poly A tail: generate_poly_a + +Primer sequence: ACCTGATCGTACG + +Function to evaluate internal priming: internal_priming \ No newline at end of file diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..364ce9b8dfad69cf60749b7daf78e9c05c326667 --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,25 @@ +"""Tests the parameter parser class.""" + +import pytest +from pathlib import Path +from src import parameter_parser as pp +from src import poly_a + +def test_parser(): + """Tests the attributes of the class.""" + par=pp.ParamParse('./tests/resources/Param_test.txt') + assert par.param_file == Path('./tests/resources/Param_test.txt') + assert par.transcripts_file == Path('./transcripts.csv') + assert par.genome_ref_file == Path('./home/ref.ref') + assert par.annotations_file == Path('./home/annotations.ann') + assert par.output_path == Path('./home/output') + assert par.n_reads == 10023 + assert par.n_cells == 34 + assert par.rna_avg == 150 + assert par.rna_sd_length == 10 + assert par.read_length == 100 + assert par.intron_rate == 0.2 + assert par.add_poly_a == bool('TRUE') + assert par.poly_a_func == 'generate_poly_a' + assert par.primer_seq == 'ACCTGATCGTACG' + assert par.priming_func == 'internal_priming' \ No newline at end of file diff --git a/tests/test_sampleinput.py b/tests/test_sampleinput.py index f73dfb9dedc3d9436a585f75f356685f59e30e9a..3f81651fa79f0e6cedb59c24ec8322141870c9af 100644 --- a/tests/test_sampleinput.py +++ b/tests/test_sampleinput.py @@ -1,4 +1,4 @@ -"""Placeholder test for pipeline.""" +"""Tests the transcriptome abundance file input reader.""" import pytest import pandas as pd