Skip to content
Snippets Groups Projects
Commit a6b19bc8 authored by Hugo Madge Leon's avatar Hugo Madge Leon
Browse files
parents 6d6438c2 d3f38575
No related branches found
No related tags found
No related merge requests found
......@@ -52,6 +52,7 @@ Temporary Items
# Ignore all local history of files
.history
.ionide
.vscode
# End of https://www.toptal.com/developers/gitignore/api/macos,visualstudiocode
......
"""Fragment sequences."""
import re
import numpy as np
import pandas as pd
def fasta_process(fasta_file):
with open(fasta_file, "r") as f:
lines = f.readlines()
def fragmentation(fasta: dict, seq_counts: pd.DataFrame,
mean_length: int, std: int,
a_prob: float, t_prob: float, g_prob: float, c_prob: float
) -> list:
"""
Fragment cDNA sequences and select terminal fragment.
ident_pattern = re.compile('>(\S+)')
seq_pattern = re.compile('^(\S+)$')
Args:
fasta_file (dict): dictionary of {transcript IDs: sequences}
counts_file (pd.DataFrame): dataframe with sequence counts and IDs
mean_length (int): mean length of desired fragments
std (int): standard deviation of desired fragment lengths
a_prob (float): probability of nucleotide A
t_prob (float): probability of nucleotide T
g_prob (float): probability of nucleotide G
c_prob (float): probability of nucleotide C
genes = {}
for line in lines:
if ident_pattern.search(line):
seq_id = (ident_pattern.search(line)).group(1)
elif seq_id in genes.keys():
genes[seq_id] += (seq_pattern.search(line)).group(1)
else:
genes[seq_id] = (seq_pattern.search(line)).group(1)
return genes
def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_prob, c_prob):
fasta = fasta_process(fasta_file)
seq_counts = pd.read_csv(counts_file, names = ["seqID", "count"])
# nucs = ['A','T','G','C']
# mononuc_freqs = [0.22, 0.25, 0.23, 0.30]
nuc_probs = {'A':a_prob, 'T':t_prob, 'G':g_prob, 'C':c_prob} # calculated using https://www.nature.com/articles/srep04532#MOESM1
Returns:
list: list of selected terminal fragments
"""
# calculated using https://www.nature.com/articles/srep04532#MOESM1
nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob}
term_frags = []
term_frags = []
for seq_id, seq in fasta.items():
counts = seq_counts[seq_counts["seqID"] == seq_id]["count"]
for _ in range(counts):
for _ in range(counts):
n_cuts = int(len(seq)/mean_length)
# non-uniformly random DNA fragmentation implementation based on https://www.nature.com/articles/srep04532#Sec1
# non-uniformly random DNA fragmentation implementation based on
# https://www.nature.com/articles/srep04532#Sec1
# assume fragmentation by sonication for NGS workflow
cuts = []
cut_nucs = np.random.choice(list(nuc_probs.keys()), n_cuts, p=list(nuc_probs.values()))
cut_nucs = np.random.choice(list(nuc_probs.keys()),
n_cuts, p=list(nuc_probs.values()))
for nuc in cut_nucs:
nuc_pos = [x.start() for x in re.finditer(nuc, seq)]
pos = np.random.choice(nuc_pos)
......@@ -46,8 +47,8 @@ def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_p
pos = np.random.choice(nuc_pos)
cuts.append(pos)
cuts.sort()
cuts.insert(0,0)
cuts.sort()
cuts.insert(0, 0)
term_frag = ""
for i, val in enumerate(cuts):
if i == len(cuts)-1:
......@@ -61,4 +62,3 @@ def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_p
else:
term_frags.append(term_frag)
return term_frags
"""Receive command line arguments, fragment sequences, and output fragments."""
import argparse
import logging
from Bio import SeqIO
import numpy as np
import pandas as pd
from pathlib import Path
from fragmentation_v2 import fragmentation
from utils import check_positive, extant_file, check_prob
from fragmentation import fragmentation
from utils import check_positive, check_prob
def main(args):
fasta, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob = args
def main(args: argparse.Namespace):
"""
Use CLI arguments to fragment sequences and output text file \
with selected terminal fragments.
term_frags = fragmentation(fasta, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob)
with open('terminal_frags.txt', 'w') as f:
for line in term_frags:
f.write(line)
f.write('\n')
Args:
args (parser): list of arguments from CLI.
"""
fasta_file, counts_file, output, mean_length, std = args[0:5]
a_prob, t_prob, g_prob, c_prob, chunk_size, sep = args[5:]
# Parse command-line arguments
def parse_arguments():
parser = argparse.ArgumentParser(description="Takes as input FASTA file of cDNA sequences, a CSV with sequence counts, and mean and std. dev. of fragment lengths and 4 nucleotide probabilities for the cuts. Outputs most terminal fragment (within desired length range) for each sequence.")
# Create or wipe output file
open(output, "w").close()
parser.add_argument('--fasta', required=True, type=extant_file, help="FASTA file with cDNA sequences")
parser.add_argument('--counts', required=True, type=extant_file, help="CSV file with sequence counts")
parser.add_argument('--mean', required = False, default = 300, type = check_positive, help="Mean fragment length (default: 10)")
parser.add_argument('--std', required = False, default = 60, type = check_positive, help="Standard deviation fragment length (defafult: 1)")
parser.add_argument('--A_prob', required=False, default = 0.22, type=check_prob, help="Probability cut happens after nucleotide A")
parser.add_argument('--T_prob', required=False, default = 0.25, type=check_prob, help="Probability cut happens after nucleotide T")
parser.add_argument('--G_prob', required=False, default = 0.25, type=check_prob, help="Probability cut happens after nucleotide G")
parser.add_argument('--C_prob', required=False, default = 0.28, type=check_prob, help="Probability cut happens after nucleotide C")
logger.info("Checking validity of files...")
fasta, seq_counts = file_validation(fasta_file, counts_file, sep)
logger.info(f"Fragmentation of {fasta_file}...")
fasta_parse = {}
for record in fasta:
fasta_parse[record.id] = record.seq
splits = np.arange(0, len(list(fasta_parse))+chunk_size, chunk_size)
for i, split in enumerate(splits):
fasta_dict = fasta_parse[split:splits[i+1]]
term_frags = fragmentation(fasta_dict, seq_counts,
mean_length, std,
a_prob, t_prob, g_prob, c_prob)
logger.info(f"Writing batch {i} sequences to {output}...")
with open(output, 'a') as f:
for line in term_frags:
f.write(f"{line}\n")
def file_validation(fasta_file: str,
counts_file: str,
sep: str) -> tuple[dict, pd.DataFrame]:
"""
Validate input files exist and are the correct format.
Args:
fasta_file (str): Input FASTA file path
counts_file (str): CSV or TSV counts file path
sep (str): Separator for counts file.
Returns:
tuple: fasta and sequence counts variables
"""
with open(fasta_file, "r") as handle:
fasta = SeqIO.parse(handle, "fasta")
try:
any(fasta)
except Exception:
logger.exception("Input FASTA file is either empty or \
incorrect file type.")
count_path = Path(counts_file)
if not count_path.is_file():
logger.exception("Input counts file does not exist or isn't a file.")
else:
if sep == ",":
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"])
else:
seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
return fasta, seq_counts
def parse_arguments() -> argparse.Namespace:
"""
Request parameters from user on CLI.
Returns:
argparse.Namespace: object of arguments from CLI.
"""
parser = argparse.ArgumentParser(description="""Takes as input FASTA file
of cDNA sequences, a CSV/TSV with sequence
counts, and mean and std. dev. of fragment
lengths and 4 nucleotide probabilities
for the cuts. Outputs most terminal
fragment (within desired length range)
for each sequence.""")
parser.add_argument('--fasta', required=True,
help="Path to FASTA file with cDNA sequences")
parser.add_argument('--counts', required=True,
help="Path to CSV/TSV file with sequence counts")
parser.add_argument('-o', '--output', required=True,
help="output file path")
parser.add_argument('--mean', required=False, default=300,
type=check_positive,
help="Mean fragment length (default: 10)")
parser.add_argument('--std', required=False, default=60,
type=check_positive,
help="Standard deviation fragment length \
(defafult: 1)")
parser.add_argument('-a', '--A_prob', required=False, default=0.22,
type=check_prob,
help="Probability cut happens after nucleotide A")
parser.add_argument('-t', '--T_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide T")
parser.add_argument('-g', '--G_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide G")
parser.add_argument('-c', '--C_prob', required=False, default=0.28,
type=check_prob,
help="Probability cut happens after nucleotide C")
parser.add_argument('-s', '--size', required=False, default=10000,
type=check_positive,
help="Chunk size for batch processing")
parser.add_argument('--sep', required=False, default=",",
type=check_positive,
help="Sequence counts file separator.")
args = parser.parse_args()
return args
if __name__ == '__main__':
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s \
(module "%(module)s")',
level=logging.INFO,
)
logger = logging.getLogger(__name__)
arguments = parse_arguments()
main(arguments)
\ No newline at end of file
main(arguments)
"""Utility functions for CLI arguments."""
import argparse
import os.path
# found on https://stackoverflow.com/questions/11540854/file-as-command-line-argument-for-argparse-error-message-if-argument-is-not-va
def extant_file(x):
def check_positive(value: str) -> int:
"""Check input value is a positive integer.
Args:
value (str): command line parameter
Raises:
argparse.ArgumentTypeError: received a negative integer
argparse.ArgumentTypeError: received a non-integer value
Returns:
integer: integer version of input value
"""
'Type' for argparse - checks that file exists but does not open.
try:
ivalue = round(float(value))
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got negative integer: {value}""")
except ValueError as exc:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got: {value}""") from exc
else:
return ivalue
def check_prob(value: str) -> float:
"""
Check probability value is within ]0,1] range.
Args:
value (str): command line parameter
Raises:
argparse.ArgumentTypeError: received a value outside valid range
Returns:
float: float version of input value
"""
if not os.path.exists(x):
# Argparse uses the ArgumentTypeError to give a rejection message like:
# error: argument input: x does not exist
raise argparse.ArgumentTypeError("{0} does not exist".format(x))
elif not x.endswith((".fasta", ".fa", ".csv")):
raise argparse.ArgumentTypeError("{0} is not the correct file format".format(x))
return x
# found on https://stackoverflow.com/questions/14117415/in-python-using-argparse-allow-only-positive-integers
def check_positive(value):
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
return ivalue
def check_prob(value):
pvalue = float(value)
if pvalue <= 0 or pvalue>1:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
return pvalue
\ No newline at end of file
if pvalue <= 0 or pvalue > 1:
raise argparse.ArgumentTypeError("""%s is an invalid positive int
value""" % value)
return pvalue
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment