Skip to content
Snippets Groups Projects
Commit f6ebce88 authored by Hugo Madge Leon's avatar Hugo Madge Leon
Browse files
parents 354fcbd4 b8924ea1
No related branches found
No related tags found
1 merge request!46Merged Tanya's files + final lint check
......@@ -59,4 +59,5 @@ Temporary Items
__pycache__/
*_cache
*egg-info/
.coverage
\ No newline at end of file
.coverage
build/
\ No newline at end of file
"""Initialise package."""
"""Receive command line arguments, fragment sequences, and output fragments."""
import argparse
import logging
from pathlib import Path
from Bio import SeqIO # type: ignore
import numpy as np
import pandas as pd # type: ignore
from term_frag_sel.fragmentation import fragmentation
from term_frag_sel.utils import check_positive, check_prob
def main(args: argparse.Namespace):
"""Use CLI arguments to fragment sequences and output text file \
with selected terminal fragments.
Args:
args (parser): list of arguments from CLI.
"""
# Create or wipe output file
with open(args.output, "w", encoding="utf-8") as _:
pass
logger.info("Checking validity of files...")
fasta, seq_counts = file_validation(args.fasta, args.counts, args.sep)
logger.info("Fragmentation of %s...", args.fasta)
fasta_parse = {}
for record in fasta:
fasta_parse[record.id] = record.seq
splits = np.arange(0, len(list(fasta_parse))+args.size, args.size)
for i, split in enumerate(splits):
fasta_dict = fasta_parse[split:splits[i+1]]
term_frags = fragmentation(fasta_dict, seq_counts,
args.mean, args.std,
args.A_prob, args.T_prob,
args.G_prob, args.C_prob)
logger.info("Writing batch %s sequences to %s...", i, args.output)
with open(args.output, 'a', encoding="utf-8") as out_file:
for line in term_frags:
out_file.write(f"{line}\n")
def file_validation(fasta_file: str,
counts_file: str,
sep: str) -> tuple[dict, pd.DataFrame]:
"""Validate input files exist and are the correct format.
Args:
fasta_file (str): Input FASTA file path
counts_file (str): CSV or TSV counts file path
sep (str): Separator for counts file.
Returns:
tuple: fasta and sequence counts variables
"""
with open(fasta_file, "r", encoding="utf-8") as handle:
fasta = SeqIO.parse(handle, "fasta")
if not any(fasta):
raise ValueError("Input FASTA file is either empty or \
incorrect file type.")
count_path = Path(counts_file)
if not count_path.is_file():
logger.exception("Input counts file does not exist or isn't a file.")
else:
if sep == ",":
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"])
else:
seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
return fasta, seq_counts
def parse_arguments() -> argparse.Namespace:
"""Request parameters from user on CLI.
Returns:
argparse.Namespace: object of arguments from CLI.
"""
parser = argparse.ArgumentParser(description="""Takes as input FASTA file
of cDNA sequences, a CSV/TSV with sequence
counts, and mean and std. dev. of fragment
lengths and 4 nucleotide probabilities
for the cuts. Outputs most terminal
fragment (within desired length range)
for each sequence.""")
parser.add_argument('--fasta', required=True,
help="Path to FASTA file with cDNA sequences")
parser.add_argument('--counts', required=True,
help="Path to CSV/TSV file with sequence counts")
parser.add_argument('-o', '--output', required=True,
help="output file path")
parser.add_argument('--mean', required=False, default=300,
type=check_positive,
help="Mean fragment length (default: 10)")
parser.add_argument('--std', required=False, default=60,
type=check_positive,
help="Standard deviation fragment length \
(defafult: 1)")
parser.add_argument('-a', '--A_prob', required=False, default=0.22,
type=check_prob,
help="Probability cut happens after nucleotide A")
parser.add_argument('-t', '--T_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide T")
parser.add_argument('-g', '--G_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide G")
parser.add_argument('-c', '--C_prob', required=False, default=0.28,
type=check_prob,
help="Probability cut happens after nucleotide C")
parser.add_argument('-s', '--size', required=False, default=10000,
type=check_positive,
help="Chunk size for batch processing")
parser.add_argument('--sep', required=False, default=",",
type=check_positive,
help="Sequence counts file separator.")
args = parser.parse_args()
return args
if __name__ == '__main__':
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s \
(module "%(module)s")',
level=logging.INFO,
)
logger = logging.getLogger(__name__)
arguments = parse_arguments()
main(arguments)
"""Fragment sequences."""
import re
import numpy as np
import pandas as pd # type: ignore
def fragmentation(fasta: dict, seq_counts: pd.DataFrame,
mean_length: int = 100, std: int = 10,
a_prob: float = 0.22, t_prob: float = 0.25,
g_prob: float = 0.25, c_prob: float = 0.28
) -> list:
"""Fragment cDNA sequences and select terminal fragment.
Args:
fasta_file (dict): dictionary of {transcript IDs: sequences}
counts_file (pd.DataFrame): dataframe with sequence counts and IDs
mean_length (int): mean length of desired fragments
std (int): standard deviation of desired fragment lengths
a_prob (float): probability of nucleotide A
t_prob (float): probability of nucleotide T
g_prob (float): probability of nucleotide G
c_prob (float): probability of nucleotide C
Returns:
list: list of selected terminal fragments
"""
# calculated using https://www.nature.com/articles/srep04532#MOESM1
nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob}
term_frags = []
for seq_id, seq in fasta.items():
counts = seq_counts[seq_counts["seqID"] == seq_id]["count"]
for _ in range(counts):
n_cuts = int(len(seq)/mean_length)
# non-uniformly random DNA fragmentation implementation based on
# https://www.nature.com/articles/srep04532#Sec1
# assume fragmentation by sonication for NGS workflow
cuts = []
cut_nucs = np.random.choice(list(nuc_probs.keys()),
n_cuts, p=list(nuc_probs.values()))
for nuc in cut_nucs:
nuc_pos = [x.start() for x in re.finditer(nuc, seq)]
pos = np.random.choice(nuc_pos)
while pos in cuts:
pos = np.random.choice(nuc_pos)
cuts.append(pos)
cuts.sort()
cuts.insert(0, 0)
term_frag = ""
for i, val in enumerate(cuts):
if i == len(cuts)-1:
fragment = seq[val+1:cuts[-1]]
else:
fragment = seq[val:cuts[i+1]]
if mean_length-std <= len(fragment) <= mean_length+std:
term_frag = fragment
if term_frag != "":
term_frags.append(term_frag)
return term_frags
"""Utility functions for CLI arguments."""
import argparse
def check_positive(value: str) -> int:
"""Check input value is a positive integer.
Args:
value (str): command line parameter
Raises:
argparse.ArgumentTypeError: received a negative integer
argparse.ArgumentTypeError: received a non-integer value
Returns:
integer: integer version of input value
"""
try:
ivalue = round(float(value))
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got negative integer: {value}""")
except ValueError as exc:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got: {value}""") from exc
else:
return ivalue
def check_prob(value: str) -> float:
"""Check probability value is within ]0,1] range.
Args:
value (str): command line parameter
Raises:
argparse.ArgumentTypeError: received a value outside valid range
Returns:
float: float version of input value
"""
pvalue = float(value)
if pvalue <= 0 or pvalue > 1:
raise argparse.ArgumentTypeError("""Expected a positive float between
0 and 1, but got {value}""")
return pvalue
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment