diff --git a/.gitignore b/.gitignore index fa5a841c65c670c06742134464f82fce2ee43aae..510b9ee493d9c985939497b3b784347cbfb27bc0 100644 --- a/.gitignore +++ b/.gitignore @@ -52,6 +52,7 @@ Temporary Items # Ignore all local history of files .history .ionide +.vscode # End of https://www.toptal.com/developers/gitignore/api/macos,visualstudiocode diff --git a/terminal-fragment-selector/fragmentation.py b/terminal-fragment-selector/fragmentation.py index 6a1e68a83728194a3aab9f0e156c3723a4c10d40..f85b1e82c68b80e7417278c87477f0ee24248ae3 100644 --- a/terminal-fragment-selector/fragmentation.py +++ b/terminal-fragment-selector/fragmentation.py @@ -1,44 +1,45 @@ +"""Fragment sequences.""" import re import numpy as np import pandas as pd -def fasta_process(fasta_file): - with open(fasta_file, "r") as f: - lines = f.readlines() +def fragmentation(fasta: dict, seq_counts: pd.DataFrame, + mean_length: int, std: int, + a_prob: float, t_prob: float, g_prob: float, c_prob: float + ) -> list: + """ + Fragment cDNA sequences and select terminal fragment. - ident_pattern = re.compile('>(\S+)') - seq_pattern = re.compile('^(\S+)$') + Args: + fasta_file (dict): dictionary of {transcript IDs: sequences} + counts_file (pd.DataFrame): dataframe with sequence counts and IDs + mean_length (int): mean length of desired fragments + std (int): standard deviation of desired fragment lengths + a_prob (float): probability of nucleotide A + t_prob (float): probability of nucleotide T + g_prob (float): probability of nucleotide G + c_prob (float): probability of nucleotide C - genes = {} - for line in lines: - if ident_pattern.search(line): - seq_id = (ident_pattern.search(line)).group(1) - elif seq_id in genes.keys(): - genes[seq_id] += (seq_pattern.search(line)).group(1) - else: - genes[seq_id] = (seq_pattern.search(line)).group(1) - return genes - -def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_prob, c_prob): - fasta = fasta_process(fasta_file) - seq_counts = pd.read_csv(counts_file, names = ["seqID", "count"]) - - # nucs = ['A','T','G','C'] - # mononuc_freqs = [0.22, 0.25, 0.23, 0.30] - nuc_probs = {'A':a_prob, 'T':t_prob, 'G':g_prob, 'C':c_prob} # calculated using https://www.nature.com/articles/srep04532#MOESM1 + Returns: + list: list of selected terminal fragments + """ + # calculated using https://www.nature.com/articles/srep04532#MOESM1 + nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob} - term_frags = [] + term_frags = [] for seq_id, seq in fasta.items(): counts = seq_counts[seq_counts["seqID"] == seq_id]["count"] - for _ in range(counts): + for _ in range(counts): n_cuts = int(len(seq)/mean_length) - - # non-uniformly random DNA fragmentation implementation based on https://www.nature.com/articles/srep04532#Sec1 + + # non-uniformly random DNA fragmentation implementation based on + # https://www.nature.com/articles/srep04532#Sec1 # assume fragmentation by sonication for NGS workflow cuts = [] - cut_nucs = np.random.choice(list(nuc_probs.keys()), n_cuts, p=list(nuc_probs.values())) + cut_nucs = np.random.choice(list(nuc_probs.keys()), + n_cuts, p=list(nuc_probs.values())) for nuc in cut_nucs: nuc_pos = [x.start() for x in re.finditer(nuc, seq)] pos = np.random.choice(nuc_pos) @@ -46,8 +47,8 @@ def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_p pos = np.random.choice(nuc_pos) cuts.append(pos) - cuts.sort() - cuts.insert(0,0) + cuts.sort() + cuts.insert(0, 0) term_frag = "" for i, val in enumerate(cuts): if i == len(cuts)-1: @@ -61,4 +62,3 @@ def fragmentation(fasta_file, counts_file, mean_length, std, a_prob, t_prob, g_p else: term_frags.append(term_frag) return term_frags - diff --git a/terminal-fragment-selector/main.py b/terminal-fragment-selector/main.py index 0088de46e17fdd97a4a52c2d33aee373c85c2b86..73368a1ae40f4121d773731a47c9b25d5127cdd0 100644 --- a/terminal-fragment-selector/main.py +++ b/terminal-fragment-selector/main.py @@ -1,35 +1,142 @@ +"""Receive command line arguments, fragment sequences, and output fragments.""" import argparse +import logging +from Bio import SeqIO +import numpy as np +import pandas as pd +from pathlib import Path -from fragmentation_v2 import fragmentation -from utils import check_positive, extant_file, check_prob +from fragmentation import fragmentation +from utils import check_positive, check_prob -def main(args): - fasta, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob = args +def main(args: argparse.Namespace): + """ + Use CLI arguments to fragment sequences and output text file \ + with selected terminal fragments. - term_frags = fragmentation(fasta, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob) - with open('terminal_frags.txt', 'w') as f: - for line in term_frags: - f.write(line) - f.write('\n') + Args: + args (parser): list of arguments from CLI. + """ + fasta_file, counts_file, output, mean_length, std = args[0:5] + a_prob, t_prob, g_prob, c_prob, chunk_size, sep = args[5:] -# Parse command-line arguments -def parse_arguments(): - parser = argparse.ArgumentParser(description="Takes as input FASTA file of cDNA sequences, a CSV with sequence counts, and mean and std. dev. of fragment lengths and 4 nucleotide probabilities for the cuts. Outputs most terminal fragment (within desired length range) for each sequence.") + # Create or wipe output file + open(output, "w").close() - parser.add_argument('--fasta', required=True, type=extant_file, help="FASTA file with cDNA sequences") - parser.add_argument('--counts', required=True, type=extant_file, help="CSV file with sequence counts") - parser.add_argument('--mean', required = False, default = 300, type = check_positive, help="Mean fragment length (default: 10)") - parser.add_argument('--std', required = False, default = 60, type = check_positive, help="Standard deviation fragment length (defafult: 1)") - parser.add_argument('--A_prob', required=False, default = 0.22, type=check_prob, help="Probability cut happens after nucleotide A") - parser.add_argument('--T_prob', required=False, default = 0.25, type=check_prob, help="Probability cut happens after nucleotide T") - parser.add_argument('--G_prob', required=False, default = 0.25, type=check_prob, help="Probability cut happens after nucleotide G") - parser.add_argument('--C_prob', required=False, default = 0.28, type=check_prob, help="Probability cut happens after nucleotide C") + logger.info("Checking validity of files...") + fasta, seq_counts = file_validation(fasta_file, counts_file, sep) + + logger.info(f"Fragmentation of {fasta_file}...") + fasta_parse = {} + for record in fasta: + fasta_parse[record.id] = record.seq + splits = np.arange(0, len(list(fasta_parse))+chunk_size, chunk_size) + + for i, split in enumerate(splits): + fasta_dict = fasta_parse[split:splits[i+1]] + term_frags = fragmentation(fasta_dict, seq_counts, + mean_length, std, + a_prob, t_prob, g_prob, c_prob) + + logger.info(f"Writing batch {i} sequences to {output}...") + with open(output, 'a') as f: + for line in term_frags: + f.write(f"{line}\n") + + +def file_validation(fasta_file: str, + counts_file: str, + sep: str) -> tuple[dict, pd.DataFrame]: + """ + Validate input files exist and are the correct format. + + Args: + fasta_file (str): Input FASTA file path + counts_file (str): CSV or TSV counts file path + sep (str): Separator for counts file. + + Returns: + tuple: fasta and sequence counts variables + """ + with open(fasta_file, "r") as handle: + fasta = SeqIO.parse(handle, "fasta") + try: + any(fasta) + except Exception: + logger.exception("Input FASTA file is either empty or \ + incorrect file type.") + + count_path = Path(counts_file) + if not count_path.is_file(): + logger.exception("Input counts file does not exist or isn't a file.") + else: + if sep == ",": + seq_counts = pd.read_csv(counts_file, names=["seqID", "count"]) + else: + seq_counts = pd.read_table(counts_file, names=["seqID", "count"]) + + return fasta, seq_counts + + +def parse_arguments() -> argparse.Namespace: + """ + Request parameters from user on CLI. + + Returns: + argparse.Namespace: object of arguments from CLI. + """ + parser = argparse.ArgumentParser(description="""Takes as input FASTA file + of cDNA sequences, a CSV/TSV with sequence + counts, and mean and std. dev. of fragment + lengths and 4 nucleotide probabilities + for the cuts. Outputs most terminal + fragment (within desired length range) + for each sequence.""") + + parser.add_argument('--fasta', required=True, + help="Path to FASTA file with cDNA sequences") + parser.add_argument('--counts', required=True, + help="Path to CSV/TSV file with sequence counts") + parser.add_argument('-o', '--output', required=True, + help="output file path") + parser.add_argument('--mean', required=False, default=300, + type=check_positive, + help="Mean fragment length (default: 10)") + parser.add_argument('--std', required=False, default=60, + type=check_positive, + help="Standard deviation fragment length \ + (defafult: 1)") + parser.add_argument('-a', '--A_prob', required=False, default=0.22, + type=check_prob, + help="Probability cut happens after nucleotide A") + parser.add_argument('-t', '--T_prob', required=False, default=0.25, + type=check_prob, + help="Probability cut happens after nucleotide T") + parser.add_argument('-g', '--G_prob', required=False, default=0.25, + type=check_prob, + help="Probability cut happens after nucleotide G") + parser.add_argument('-c', '--C_prob', required=False, default=0.28, + type=check_prob, + help="Probability cut happens after nucleotide C") + parser.add_argument('-s', '--size', required=False, default=10000, + type=check_positive, + help="Chunk size for batch processing") + parser.add_argument('--sep', required=False, default=",", + type=check_positive, + help="Sequence counts file separator.") args = parser.parse_args() - + return args if __name__ == '__main__': + logging.basicConfig( + format='[%(asctime)s: %(levelname)s] %(message)s \ + (module "%(module)s")', + level=logging.INFO, + ) + logger = logging.getLogger(__name__) + arguments = parse_arguments() - main(arguments) \ No newline at end of file + main(arguments) diff --git a/terminal-fragment-selector/utils.py b/terminal-fragment-selector/utils.py index aced683d70d2d8ad1f99824062143f533efe035f..a9474118248a251d7f2c300772f4bcf35e66721b 100644 --- a/terminal-fragment-selector/utils.py +++ b/terminal-fragment-selector/utils.py @@ -1,29 +1,47 @@ +"""Utility functions for CLI arguments.""" import argparse -import os.path -# found on https://stackoverflow.com/questions/11540854/file-as-command-line-argument-for-argparse-error-message-if-argument-is-not-va -def extant_file(x): +def check_positive(value: str) -> int: + """Check input value is a positive integer. + + Args: + value (str): command line parameter + + Raises: + argparse.ArgumentTypeError: received a negative integer + argparse.ArgumentTypeError: received a non-integer value + + Returns: + integer: integer version of input value """ - 'Type' for argparse - checks that file exists but does not open. + try: + ivalue = round(float(value)) + if ivalue <= 0: + raise argparse.ArgumentTypeError(f"""Expected positive integer, + got negative integer: {value}""") + except ValueError as exc: + raise argparse.ArgumentTypeError(f"""Expected positive integer, + got: {value}""") from exc + else: + return ivalue + + +def check_prob(value: str) -> float: + """ + Check probability value is within ]0,1] range. + + Args: + value (str): command line parameter + + Raises: + argparse.ArgumentTypeError: received a value outside valid range + + Returns: + float: float version of input value """ - if not os.path.exists(x): - # Argparse uses the ArgumentTypeError to give a rejection message like: - # error: argument input: x does not exist - raise argparse.ArgumentTypeError("{0} does not exist".format(x)) - elif not x.endswith((".fasta", ".fa", ".csv")): - raise argparse.ArgumentTypeError("{0} is not the correct file format".format(x)) - return x - -# found on https://stackoverflow.com/questions/14117415/in-python-using-argparse-allow-only-positive-integers -def check_positive(value): - ivalue = int(value) - if ivalue <= 0: - raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value) - return ivalue - -def check_prob(value): pvalue = float(value) - if pvalue <= 0 or pvalue>1: - raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value) - return pvalue \ No newline at end of file + if pvalue <= 0 or pvalue > 1: + raise argparse.ArgumentTypeError("""%s is an invalid positive int + value""" % value) + return pvalue