Skip to content
Snippets Groups Projects
Commit bed702e5 authored by Hugo Madge Leon's avatar Hugo Madge Leon
Browse files

remove build files

parent af26d02f
No related branches found
No related tags found
1 merge request!42Remove build files
Pipeline #14868 failed
......@@ -59,4 +59,5 @@ Temporary Items
__pycache__/
*_cache
*egg-info/
.coverage
\ No newline at end of file
.coverage
build/
\ No newline at end of file
"""Initialise package."""
"""Receive command line arguments, fragment sequences, and output fragments."""
import argparse
import logging
from pathlib import Path
from Bio import SeqIO # type: ignore
import numpy as np
import pandas as pd # type: ignore
from term_frag_sel.fragmentation import fragmentation
from term_frag_sel.utils import check_positive, check_prob
def main(args: argparse.Namespace):
"""Use CLI arguments to fragment sequences and output text file \
with selected terminal fragments.
Args:
args (parser): list of arguments from CLI.
"""
# Create or wipe output file
with open(args.output, "w", encoding="utf-8") as _:
pass
logger.info("Checking validity of files...")
fasta, seq_counts = file_validation(args.fasta, args.counts, args.sep)
logger.info("Fragmentation of %s...", args.fasta)
fasta_parse = {}
for record in fasta:
fasta_parse[record.id] = record.seq
splits = np.arange(0, len(list(fasta_parse))+args.size, args.size)
for i, split in enumerate(splits):
fasta_dict = fasta_parse[split:splits[i+1]]
term_frags = fragmentation(fasta_dict, seq_counts,
args.mean, args.std,
args.A_prob, args.T_prob,
args.G_prob, args.C_prob)
logger.info("Writing batch %s sequences to %s...", i, args.output)
with open(args.output, 'a', encoding="utf-8") as out_file:
for line in term_frags:
out_file.write(f"{line}\n")
def file_validation(fasta_file: str,
counts_file: str,
sep: str) -> tuple[dict, pd.DataFrame]:
"""Validate input files exist and are the correct format.
Args:
fasta_file (str): Input FASTA file path
counts_file (str): CSV or TSV counts file path
sep (str): Separator for counts file.
Returns:
tuple: fasta and sequence counts variables
"""
with open(fasta_file, "r", encoding="utf-8") as handle:
fasta = SeqIO.parse(handle, "fasta")
if not any(fasta):
raise ValueError("Input FASTA file is either empty or \
incorrect file type.")
count_path = Path(counts_file)
if not count_path.is_file():
logger.exception("Input counts file does not exist or isn't a file.")
else:
if sep == ",":
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"])
else:
seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
return fasta, seq_counts
def parse_arguments() -> argparse.Namespace:
"""Request parameters from user on CLI.
Returns:
argparse.Namespace: object of arguments from CLI.
"""
parser = argparse.ArgumentParser(description="""Takes as input FASTA file
of cDNA sequences, a CSV/TSV with sequence
counts, and mean and std. dev. of fragment
lengths and 4 nucleotide probabilities
for the cuts. Outputs most terminal
fragment (within desired length range)
for each sequence.""")
parser.add_argument('--fasta', required=True,
help="Path to FASTA file with cDNA sequences")
parser.add_argument('--counts', required=True,
help="Path to CSV/TSV file with sequence counts")
parser.add_argument('-o', '--output', required=True,
help="output file path")
parser.add_argument('--mean', required=False, default=300,
type=check_positive,
help="Mean fragment length (default: 10)")
parser.add_argument('--std', required=False, default=60,
type=check_positive,
help="Standard deviation fragment length \
(defafult: 1)")
parser.add_argument('-a', '--A_prob', required=False, default=0.22,
type=check_prob,
help="Probability cut happens after nucleotide A")
parser.add_argument('-t', '--T_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide T")
parser.add_argument('-g', '--G_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide G")
parser.add_argument('-c', '--C_prob', required=False, default=0.28,
type=check_prob,
help="Probability cut happens after nucleotide C")
parser.add_argument('-s', '--size', required=False, default=10000,
type=check_positive,
help="Chunk size for batch processing")
parser.add_argument('--sep', required=False, default=",",
type=check_positive,
help="Sequence counts file separator.")
args = parser.parse_args()
return args
if __name__ == '__main__':
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s \
(module "%(module)s")',
level=logging.INFO,
)
logger = logging.getLogger(__name__)
arguments = parse_arguments()
main(arguments)
"""Fragment sequences."""
import re
import numpy as np
import pandas as pd # type: ignore
def fragmentation(fasta: dict, seq_counts: pd.DataFrame,
mean_length: int = 100, std: int = 10,
a_prob: float = 0.22, t_prob: float = 0.25,
g_prob: float = 0.25, c_prob: float = 0.28
) -> list:
"""Fragment cDNA sequences and select terminal fragment.
Args:
fasta_file (dict): dictionary of {transcript IDs: sequences}
counts_file (pd.DataFrame): dataframe with sequence counts and IDs
mean_length (int): mean length of desired fragments
std (int): standard deviation of desired fragment lengths
a_prob (float): probability of nucleotide A
t_prob (float): probability of nucleotide T
g_prob (float): probability of nucleotide G
c_prob (float): probability of nucleotide C
Returns:
list: list of selected terminal fragments
"""
# calculated using https://www.nature.com/articles/srep04532#MOESM1
nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob}
term_frags = []
for seq_id, seq in fasta.items():
counts = seq_counts[seq_counts["seqID"] == seq_id]["count"]
for _ in range(counts):
n_cuts = int(len(seq)/mean_length)
# non-uniformly random DNA fragmentation implementation based on
# https://www.nature.com/articles/srep04532#Sec1
# assume fragmentation by sonication for NGS workflow
cuts = []
cut_nucs = np.random.choice(list(nuc_probs.keys()),
n_cuts, p=list(nuc_probs.values()))
for nuc in cut_nucs:
nuc_pos = [x.start() for x in re.finditer(nuc, seq)]
pos = np.random.choice(nuc_pos)
while pos in cuts:
pos = np.random.choice(nuc_pos)
cuts.append(pos)
cuts.sort()
cuts.insert(0, 0)
term_frag = ""
for i, val in enumerate(cuts):
if i == len(cuts)-1:
fragment = seq[val+1:cuts[-1]]
else:
fragment = seq[val:cuts[i+1]]
if mean_length-std <= len(fragment) <= mean_length+std:
term_frag = fragment
if term_frag != "":
term_frags.append(term_frag)
return term_frags
"""Utility functions for CLI arguments."""
import argparse
def check_positive(value: str) -> int:
"""Check input value is a positive integer.
Args:
value (str): command line parameter
Raises:
argparse.ArgumentTypeError: received a negative integer
argparse.ArgumentTypeError: received a non-integer value
Returns:
integer: integer version of input value
"""
try:
ivalue = round(float(value))
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got negative integer: {value}""")
except ValueError as exc:
raise argparse.ArgumentTypeError(f"""Expected positive integer,
got: {value}""") from exc
else:
return ivalue
def check_prob(value: str) -> float:
"""Check probability value is within ]0,1] range.
Args:
value (str): command line parameter
Raises:
argparse.ArgumentTypeError: received a value outside valid range
Returns:
float: float version of input value
"""
pvalue = float(value)
if pvalue <= 0 or pvalue > 1:
raise argparse.ArgumentTypeError("""Expected a positive float between
0 and 1, but got {value}""")
return pvalue
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment