From 591913c94be9e9fae73d472a86da43788f530f13 Mon Sep 17 00:00:00 2001 From: "hugo.madgeleon" <hugo.madgeleon@stud.unibas.ch> Date: Fri, 9 Dec 2022 16:18:06 +0100 Subject: [PATCH] type hinting + file validation --- terminal-fragment-selector/fragmentation.py | 13 ++-- terminal-fragment-selector/main.py | 69 ++++++++++++++++----- terminal-fragment-selector/utils.py | 23 ++----- 3 files changed, 66 insertions(+), 39 deletions(-) diff --git a/terminal-fragment-selector/fragmentation.py b/terminal-fragment-selector/fragmentation.py index 6c509c7..f85b1e8 100644 --- a/terminal-fragment-selector/fragmentation.py +++ b/terminal-fragment-selector/fragmentation.py @@ -5,14 +5,16 @@ import numpy as np import pandas as pd -def fragmentation(fasta, counts_file, mean_length, std, - a_prob, t_prob, g_prob, c_prob): +def fragmentation(fasta: dict, seq_counts: pd.DataFrame, + mean_length: int, std: int, + a_prob: float, t_prob: float, g_prob: float, c_prob: float + ) -> list: """ Fragment cDNA sequences and select terminal fragment. Args: - fasta_file (fasta): FASTA file with cDNA sequences - counts_file (text): CSV or TSV file woth sequence counts + fasta_file (dict): dictionary of {transcript IDs: sequences} + counts_file (pd.DataFrame): dataframe with sequence counts and IDs mean_length (int): mean length of desired fragments std (int): standard deviation of desired fragment lengths a_prob (float): probability of nucleotide A @@ -23,9 +25,6 @@ def fragmentation(fasta, counts_file, mean_length, std, Returns: list: list of selected terminal fragments """ - seq_counts = pd.read_csv(counts_file, - names=["seqID", "count"]) - # calculated using https://www.nature.com/articles/srep04532#MOESM1 nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob} diff --git a/terminal-fragment-selector/main.py b/terminal-fragment-selector/main.py index 865705e..73368a1 100644 --- a/terminal-fragment-selector/main.py +++ b/terminal-fragment-selector/main.py @@ -1,14 +1,16 @@ -"""Receive command line arguments, fragment, and output fragments.""" +"""Receive command line arguments, fragment sequences, and output fragments.""" import argparse import logging from Bio import SeqIO import numpy as np +import pandas as pd +from pathlib import Path from fragmentation import fragmentation -from utils import check_positive, extant_file, check_prob +from utils import check_positive, check_prob -def main(args): +def main(args: argparse.Namespace): """ Use CLI arguments to fragment sequences and output text file \ with selected terminal fragments. @@ -16,37 +18,73 @@ def main(args): Args: args (parser): list of arguments from CLI. """ - fasta, seq_counts, output, mean_length, std = args[0:5] - a_prob, t_prob, g_prob, c_prob, chunk_size = args[5:] + fasta_file, counts_file, output, mean_length, std = args[0:5] + a_prob, t_prob, g_prob, c_prob, chunk_size, sep = args[5:] # Create or wipe output file open(output, "w").close() - logger.info(f"Fragmentation of {fasta}") + logger.info("Checking validity of files...") + fasta, seq_counts = file_validation(fasta_file, counts_file, sep) + logger.info(f"Fragmentation of {fasta_file}...") fasta_parse = {} - for record in SeqIO.parse(fasta, "fasta"): + for record in fasta: fasta_parse[record.id] = record.seq splits = np.arange(0, len(list(fasta_parse))+chunk_size, chunk_size) for i, split in enumerate(splits): fasta_dict = fasta_parse[split:splits[i+1]] - term_frags = fragmentation(fasta_dict, seq_counts, output, + term_frags = fragmentation(fasta_dict, seq_counts, mean_length, std, a_prob, t_prob, g_prob, c_prob) - logger.info(f"Writing batch {i} sequences to {output}") + logger.info(f"Writing batch {i} sequences to {output}...") with open(output, 'a') as f: for line in term_frags: f.write(f"{line}\n") -def parse_arguments(): +def file_validation(fasta_file: str, + counts_file: str, + sep: str) -> tuple[dict, pd.DataFrame]: + """ + Validate input files exist and are the correct format. + + Args: + fasta_file (str): Input FASTA file path + counts_file (str): CSV or TSV counts file path + sep (str): Separator for counts file. + + Returns: + tuple: fasta and sequence counts variables + """ + with open(fasta_file, "r") as handle: + fasta = SeqIO.parse(handle, "fasta") + try: + any(fasta) + except Exception: + logger.exception("Input FASTA file is either empty or \ + incorrect file type.") + + count_path = Path(counts_file) + if not count_path.is_file(): + logger.exception("Input counts file does not exist or isn't a file.") + else: + if sep == ",": + seq_counts = pd.read_csv(counts_file, names=["seqID", "count"]) + else: + seq_counts = pd.read_table(counts_file, names=["seqID", "count"]) + + return fasta, seq_counts + + +def parse_arguments() -> argparse.Namespace: """ Request parameters from user on CLI. Returns: - parser: list of arguments from CLI. + argparse.Namespace: object of arguments from CLI. """ parser = argparse.ArgumentParser(description="""Takes as input FASTA file of cDNA sequences, a CSV/TSV with sequence @@ -56,11 +94,11 @@ def parse_arguments(): fragment (within desired length range) for each sequence.""") - parser.add_argument('--fasta', required=True, type=extant_file, + parser.add_argument('--fasta', required=True, help="Path to FASTA file with cDNA sequences") - parser.add_argument('--counts', required=True, type=extant_file, + parser.add_argument('--counts', required=True, help="Path to CSV/TSV file with sequence counts") - parser.add_argument('-o', '--output', required=True, type=extant_file, + parser.add_argument('-o', '--output', required=True, help="output file path") parser.add_argument('--mean', required=False, default=300, type=check_positive, @@ -84,6 +122,9 @@ def parse_arguments(): parser.add_argument('-s', '--size', required=False, default=10000, type=check_positive, help="Chunk size for batch processing") + parser.add_argument('--sep', required=False, default=",", + type=check_positive, + help="Sequence counts file separator.") args = parser.parse_args() return args diff --git a/terminal-fragment-selector/utils.py b/terminal-fragment-selector/utils.py index 0e421ca..a947411 100644 --- a/terminal-fragment-selector/utils.py +++ b/terminal-fragment-selector/utils.py @@ -1,25 +1,12 @@ -"""Utility functions for command line arguments.""" +"""Utility functions for CLI arguments.""" import argparse -import os.path -# found on shorturl.at/vzAX4 -def extant_file(x): - if not os.path.exists(x): - # Argparse uses the ArgumentTypeError to give a rejection message like: - # error: argument input: x does not exist - raise argparse.ArgumentTypeError("{0} does not exist".format(x)) - elif not x.endswith((".fasta", ".fa", ".csv")): - raise argparse.ArgumentTypeError("""{0} is not the correct - file format""".format(x)) - return x - - -def check_positive(value): +def check_positive(value: str) -> int: """Check input value is a positive integer. Args: - value (string): command line parameter + value (str): command line parameter Raises: argparse.ArgumentTypeError: received a negative integer @@ -40,12 +27,12 @@ def check_positive(value): return ivalue -def check_prob(value): +def check_prob(value: str) -> float: """ Check probability value is within ]0,1] range. Args: - value (string): command line parameter + value (str): command line parameter Raises: argparse.ArgumentTypeError: received a value outside valid range -- GitLab