From d3f38575386518350c31b64b7752e9686f0c964d Mon Sep 17 00:00:00 2001 From: Hugo Madge Leon <hugo.madgeleon@stud.unibas.ch> Date: Fri, 9 Dec 2022 15:20:47 +0000 Subject: [PATCH] Fix all issues from review except frag theory --- terminal-fragment-selector/fragmentation.py | 42 ++------ terminal-fragment-selector/main.py | 112 ++++++++++++++++---- terminal-fragment-selector/utils.py | 23 +--- 3 files changed, 103 insertions(+), 74 deletions(-) diff --git a/terminal-fragment-selector/fragmentation.py b/terminal-fragment-selector/fragmentation.py index ff7c73f..f85b1e8 100644 --- a/terminal-fragment-selector/fragmentation.py +++ b/terminal-fragment-selector/fragmentation.py @@ -5,42 +5,16 @@ import numpy as np import pandas as pd -def fasta_process(fasta_file): - """ - Pre-process FASTA file. - - Args: - fasta_file (fasta): FASTA file with cDNA sequences - - Returns: - dict: Dictionary of gene sequence IDs and their sequence - """ - with open(fasta_file, "r") as f: - lines = f.readlines() - - # Tanya, try \\S instead of \S and see if that works - ident_pattern = re.compile('>(\S+)') - seq_pattern = re.compile('^(\S+)$') - - genes = {} - for line in lines: - if ident_pattern.search(line): - seq_id = (ident_pattern.search(line)).group(1) - elif seq_id in genes.keys(): - genes[seq_id] += (seq_pattern.search(line)).group(1) - else: - genes[seq_id] = (seq_pattern.search(line)).group(1) - return genes - - -def fragmentation(fasta_file, counts_file, mean_length, std, - a_prob, t_prob, g_prob, c_prob): +def fragmentation(fasta: dict, seq_counts: pd.DataFrame, + mean_length: int, std: int, + a_prob: float, t_prob: float, g_prob: float, c_prob: float + ) -> list: """ Fragment cDNA sequences and select terminal fragment. Args: - fasta_file (fasta): FASTA file with cDNA sequences - counts_file (text): CSV or TSV file woth sequence counts + fasta_file (dict): dictionary of {transcript IDs: sequences} + counts_file (pd.DataFrame): dataframe with sequence counts and IDs mean_length (int): mean length of desired fragments std (int): standard deviation of desired fragment lengths a_prob (float): probability of nucleotide A @@ -51,10 +25,6 @@ def fragmentation(fasta_file, counts_file, mean_length, std, Returns: list: list of selected terminal fragments """ - fasta = fasta_process(fasta_file) - seq_counts = pd.read_csv(counts_file, - names=["seqID", "count"]) - # calculated using https://www.nature.com/articles/srep04532#MOESM1 nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob} diff --git a/terminal-fragment-selector/main.py b/terminal-fragment-selector/main.py index b91fb2f..73368a1 100644 --- a/terminal-fragment-selector/main.py +++ b/terminal-fragment-selector/main.py @@ -1,11 +1,16 @@ -"""Receive command line arguments, fragment, and output fragments.""" +"""Receive command line arguments, fragment sequences, and output fragments.""" import argparse +import logging +from Bio import SeqIO +import numpy as np +import pandas as pd +from pathlib import Path from fragmentation import fragmentation -from utils import check_positive, extant_file, check_prob +from utils import check_positive, check_prob -def main(args): +def main(args: argparse.Namespace): """ Use CLI arguments to fragment sequences and output text file \ with selected terminal fragments. @@ -13,34 +18,88 @@ def main(args): Args: args (parser): list of arguments from CLI. """ - fasta, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob = args + fasta_file, counts_file, output, mean_length, std = args[0:5] + a_prob, t_prob, g_prob, c_prob, chunk_size, sep = args[5:] - term_frags = fragmentation(fasta, seq_counts, mean_length, std, - a_prob, t_prob, g_prob, c_prob) - with open('terminal_frags.txt', 'w') as f: - for line in term_frags: - f.write(f"{line}\n") + # Create or wipe output file + open(output, "w").close() + logger.info("Checking validity of files...") + fasta, seq_counts = file_validation(fasta_file, counts_file, sep) -def parse_arguments(): + logger.info(f"Fragmentation of {fasta_file}...") + fasta_parse = {} + for record in fasta: + fasta_parse[record.id] = record.seq + splits = np.arange(0, len(list(fasta_parse))+chunk_size, chunk_size) + + for i, split in enumerate(splits): + fasta_dict = fasta_parse[split:splits[i+1]] + term_frags = fragmentation(fasta_dict, seq_counts, + mean_length, std, + a_prob, t_prob, g_prob, c_prob) + + logger.info(f"Writing batch {i} sequences to {output}...") + with open(output, 'a') as f: + for line in term_frags: + f.write(f"{line}\n") + + +def file_validation(fasta_file: str, + counts_file: str, + sep: str) -> tuple[dict, pd.DataFrame]: + """ + Validate input files exist and are the correct format. + + Args: + fasta_file (str): Input FASTA file path + counts_file (str): CSV or TSV counts file path + sep (str): Separator for counts file. + + Returns: + tuple: fasta and sequence counts variables + """ + with open(fasta_file, "r") as handle: + fasta = SeqIO.parse(handle, "fasta") + try: + any(fasta) + except Exception: + logger.exception("Input FASTA file is either empty or \ + incorrect file type.") + + count_path = Path(counts_file) + if not count_path.is_file(): + logger.exception("Input counts file does not exist or isn't a file.") + else: + if sep == ",": + seq_counts = pd.read_csv(counts_file, names=["seqID", "count"]) + else: + seq_counts = pd.read_table(counts_file, names=["seqID", "count"]) + + return fasta, seq_counts + + +def parse_arguments() -> argparse.Namespace: """ Request parameters from user on CLI. Returns: - parser: list of arguments from CLI. + argparse.Namespace: object of arguments from CLI. """ parser = argparse.ArgumentParser(description="""Takes as input FASTA file - of cDNA sequences, a CSV with sequence + of cDNA sequences, a CSV/TSV with sequence counts, and mean and std. dev. of fragment lengths and 4 nucleotide probabilities for the cuts. Outputs most terminal fragment (within desired length range) for each sequence.""") - parser.add_argument('--fasta', required=True, type=extant_file, - help="FASTA file with cDNA sequences") - parser.add_argument('--counts', required=True, type=extant_file, - help="CSV file with sequence counts") + parser.add_argument('--fasta', required=True, + help="Path to FASTA file with cDNA sequences") + parser.add_argument('--counts', required=True, + help="Path to CSV/TSV file with sequence counts") + parser.add_argument('-o', '--output', required=True, + help="output file path") parser.add_argument('--mean', required=False, default=300, type=check_positive, help="Mean fragment length (default: 10)") @@ -48,23 +107,36 @@ def parse_arguments(): type=check_positive, help="Standard deviation fragment length \ (defafult: 1)") - parser.add_argument('--A_prob', required=False, default=0.22, + parser.add_argument('-a', '--A_prob', required=False, default=0.22, type=check_prob, help="Probability cut happens after nucleotide A") - parser.add_argument('--T_prob', required=False, default=0.25, + parser.add_argument('-t', '--T_prob', required=False, default=0.25, type=check_prob, help="Probability cut happens after nucleotide T") - parser.add_argument('--G_prob', required=False, default=0.25, + parser.add_argument('-g', '--G_prob', required=False, default=0.25, type=check_prob, help="Probability cut happens after nucleotide G") - parser.add_argument('--C_prob', required=False, default=0.28, + parser.add_argument('-c', '--C_prob', required=False, default=0.28, type=check_prob, help="Probability cut happens after nucleotide C") + parser.add_argument('-s', '--size', required=False, default=10000, + type=check_positive, + help="Chunk size for batch processing") + parser.add_argument('--sep', required=False, default=",", + type=check_positive, + help="Sequence counts file separator.") args = parser.parse_args() return args if __name__ == '__main__': + logging.basicConfig( + format='[%(asctime)s: %(levelname)s] %(message)s \ + (module "%(module)s")', + level=logging.INFO, + ) + logger = logging.getLogger(__name__) + arguments = parse_arguments() main(arguments) diff --git a/terminal-fragment-selector/utils.py b/terminal-fragment-selector/utils.py index 0e421ca..a947411 100644 --- a/terminal-fragment-selector/utils.py +++ b/terminal-fragment-selector/utils.py @@ -1,25 +1,12 @@ -"""Utility functions for command line arguments.""" +"""Utility functions for CLI arguments.""" import argparse -import os.path -# found on shorturl.at/vzAX4 -def extant_file(x): - if not os.path.exists(x): - # Argparse uses the ArgumentTypeError to give a rejection message like: - # error: argument input: x does not exist - raise argparse.ArgumentTypeError("{0} does not exist".format(x)) - elif not x.endswith((".fasta", ".fa", ".csv")): - raise argparse.ArgumentTypeError("""{0} is not the correct - file format""".format(x)) - return x - - -def check_positive(value): +def check_positive(value: str) -> int: """Check input value is a positive integer. Args: - value (string): command line parameter + value (str): command line parameter Raises: argparse.ArgumentTypeError: received a negative integer @@ -40,12 +27,12 @@ def check_positive(value): return ivalue -def check_prob(value): +def check_prob(value: str) -> float: """ Check probability value is within ]0,1] range. Args: - value (string): command line parameter + value (str): command line parameter Raises: argparse.ArgumentTypeError: received a value outside valid range -- GitLab