Skip to content
Snippets Groups Projects
Commit 995f7d7b authored by Hugo Madge Leon's avatar Hugo Madge Leon
Browse files

linting + small fixes

parent 6d6438c2
No related branches found
No related tags found
1 merge request!31linting + small fixes
......@@ -52,6 +52,7 @@ Temporary Items
# Ignore all local history of files
.history
.ionide
.vscode
# End of https://www.toptal.com/developers/gitignore/api/macos,visualstudiocode
......
"""Fragment sequences."""
import re
import numpy as np
......@@ -5,9 +6,19 @@ import pandas as pd
def fasta_process(fasta_file):
"""
Pre-process FASTA file.
Args:
fasta_file (fasta): FASTA file with cDNA sequences
Returns:
dict: Dictionary of gene sequence IDs and their sequence
"""
with open(fasta_file, "r") as f:
lines = f.readlines()
# Tanya, try \\S instead of \S and see if that works
ident_pattern = re.compile('>(\S+)')
seq_pattern = re.compile('^(\S+)$')
......@@ -21,24 +32,44 @@ def fasta_process(fasta_file):
genes[seq_id] = (seq_pattern.search(line)).group(1)
return genes
def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_prob, c_prob):
def fragmentation(fasta_file, counts_file, mean_length, std,
a_prob, t_prob, g_prob, c_prob):
"""
Fragment cDNA sequences and select terminal fragment.
Args:
fasta_file (fasta): FASTA file with cDNA sequences
counts_file (text): CSV or TSV file woth sequence counts
mean_length (int): mean length of desired fragments
std (int): standard deviation of desired fragment lengths
a_prob (float): probability of nucleotide A
t_prob (float): probability of nucleotide T
g_prob (float): probability of nucleotide G
c_prob (float): probability of nucleotide C
Returns:
list: list of selected terminal fragments
"""
fasta = fasta_process(fasta_file)
seq_counts = pd.read_csv(counts_file, names = ["seqID", "count"])
seq_counts = pd.read_csv(counts_file,
names=["seqID", "count"])
# nucs = ['A','T','G','C']
# mononuc_freqs = [0.22, 0.25, 0.23, 0.30]
nuc_probs = {'A':a_prob, 'T':t_prob, 'G':g_prob, 'C':c_prob} # calculated using https://www.nature.com/articles/srep04532#MOESM1
# calculated using https://www.nature.com/articles/srep04532#MOESM1
nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob}
term_frags = []
term_frags = []
for seq_id, seq in fasta.items():
counts = seq_counts[seq_counts["seqID"] == seq_id]["count"]
for _ in range(counts):
for _ in range(counts):
n_cuts = int(len(seq)/mean_length)
# non-uniformly random DNA fragmentation implementation based on https://www.nature.com/articles/srep04532#Sec1
# non-uniformly random DNA fragmentation implementation based on
# https://www.nature.com/articles/srep04532#Sec1
# assume fragmentation by sonication for NGS workflow
cuts = []
cut_nucs = np.random.choice(list(nuc_probs.keys()), n_cuts, p=list(nuc_probs.values()))
cut_nucs = np.random.choice(list(nuc_probs.keys()),
n_cuts, p=list(nuc_probs.values()))
for nuc in cut_nucs:
nuc_pos = [x.start() for x in re.finditer(nuc, seq)]
pos = np.random.choice(nuc_pos)
......@@ -46,8 +77,8 @@ def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_p
pos = np.random.choice(nuc_pos)
cuts.append(pos)
cuts.sort()
cuts.insert(0,0)
cuts.sort()
cuts.insert(0, 0)
term_frag = ""
for i, val in enumerate(cuts):
if i == len(cuts)-1:
......@@ -61,4 +92,3 @@ def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_p
else:
term_frags.append(term_frag)
return term_frags
"""Receive command line arguments, fragment, and output fragments."""
import argparse
from fragmentation_v2 import fragmentation
from fragmentation import fragmentation
from utils import check_positive, extant_file, check_prob
def main(args):
def main(args):
"""
Use CLI arguments to fragment sequences and output text file \
with selected terminal fragments.
Args:
args (parser): list of arguments from CLI.
"""
fasta, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob = args
term_frags = fragmentation(fasta, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob)
term_frags = fragmentation(fasta, seq_counts, mean_length, std,
a_prob, t_prob, g_prob, c_prob)
with open('terminal_frags.txt', 'w') as f:
for line in term_frags:
f.write(line)
f.write('\n')
f.write(f"{line}\n")
# Parse command-line arguments
def parse_arguments():
parser = argparse.ArgumentParser(description="Takes as input FASTA file of cDNA sequences, a CSV with sequence counts, and mean and std. dev. of fragment lengths and 4 nucleotide probabilities for the cuts. Outputs most terminal fragment (within desired length range) for each sequence.")
parser.add_argument('--fasta', required=True, type=extant_file, help="FASTA file with cDNA sequences")
parser.add_argument('--counts', required=True, type=extant_file, help="CSV file with sequence counts")
parser.add_argument('--mean', required = False, default = 300, type = check_positive, help="Mean fragment length (default: 10)")
parser.add_argument('--std', required = False, default = 60, type = check_positive, help="Standard deviation fragment length (defafult: 1)")
parser.add_argument('--A_prob', required=False, default = 0.22, type=check_prob, help="Probability cut happens after nucleotide A")
parser.add_argument('--T_prob', required=False, default = 0.25, type=check_prob, help="Probability cut happens after nucleotide T")
parser.add_argument('--G_prob', required=False, default = 0.25, type=check_prob, help="Probability cut happens after nucleotide G")
parser.add_argument('--C_prob', required=False, default = 0.28, type=check_prob, help="Probability cut happens after nucleotide C")
"""
Request parameters from user on CLI.
Returns:
parser: list of arguments from CLI.
"""
parser = argparse.ArgumentParser(description="""Takes as input FASTA file
of cDNA sequences, a CSV with sequence
counts, and mean and std. dev. of fragment
lengths and 4 nucleotide probabilities
for the cuts. Outputs most terminal
fragment (within desired length range)
for each sequence.""")
parser.add_argument('--fasta', required=True, type=extant_file,
help="FASTA file with cDNA sequences")
parser.add_argument('--counts', required=True, type=extant_file,
help="CSV file with sequence counts")
parser.add_argument('--mean', required=False, default=300,
type=check_positive,
help="Mean fragment length (default: 10)")
parser.add_argument('--std', required=False, default=60,
type=check_positive,
help="Standard deviation fragment length \
(defafult: 1)")
parser.add_argument('--A_prob', required=False, default=0.22,
type=check_prob,
help="Probability cut happens after nucleotide A")
parser.add_argument('--T_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide T")
parser.add_argument('--G_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide G")
parser.add_argument('--C_prob', required=False, default=0.28,
type=check_prob,
help="Probability cut happens after nucleotide C")
args = parser.parse_args()
return args
if __name__ == '__main__':
arguments = parse_arguments()
main(arguments)
\ No newline at end of file
main(arguments)
"""Utility functions for command line arguments."""
import argparse
import os.path
# found on https://stackoverflow.com/questions/11540854/file-as-command-line-argument-for-argparse-error-message-if-argument-is-not-va
# found on shorturl.at/vzAX4
def extant_file(x):
"""
'Type' for argparse - checks that file exists but does not open.
"""
if not os.path.exists(x):
# Argparse uses the ArgumentTypeError to give a rejection message like:
# error: argument input: x does not exist
raise argparse.ArgumentTypeError("{0} does not exist".format(x))
elif not x.endswith((".fasta", ".fa", ".csv")):
raise argparse.ArgumentTypeError("{0} is not the correct file format".format(x))
raise argparse.ArgumentTypeError("""{0} is not the correct
file format""".format(x))
return x
# found on https://stackoverflow.com/questions/14117415/in-python-using-argparse-allow-only-positive-integers
def check_positive(value):
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
return ivalue
"""Check input value is a positive integer.
Args:
value (string): command line parameter
Raises:
argparse.ArgumentTypeError: received a negative integer
argparse.ArgumentTypeError: received a non-integer value
Returns:
integer: integer version of input value
"""
try:
ivalue = round(float(value))
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got negative integer: {value}""")
except ValueError as exc:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got: {value}""") from exc
else:
return ivalue
def check_prob(value):
"""
Check probability value is within ]0,1] range.
Args:
value (string): command line parameter
Raises:
argparse.ArgumentTypeError: received a value outside valid range
Returns:
float: float version of input value
"""
pvalue = float(value)
if pvalue <= 0 or pvalue>1:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
return pvalue
\ No newline at end of file
if pvalue <= 0 or pvalue > 1:
raise argparse.ArgumentTypeError("""%s is an invalid positive int
value""" % value)
return pvalue
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment