Skip to content
Snippets Groups Projects
Commit 591913c9 authored by Hugo Madge Leon's avatar Hugo Madge Leon
Browse files

type hinting + file validation

parent 93ef9e43
No related branches found
No related tags found
2 merge requests!33remove weird setup.py,!32Fix all issues from review except frag theory
This commit is part of merge request !32. Comments created here will be created in the context of that merge request.
...@@ -5,14 +5,16 @@ import numpy as np ...@@ -5,14 +5,16 @@ import numpy as np
import pandas as pd import pandas as pd
def fragmentation(fasta, counts_file, mean_length, std, def fragmentation(fasta: dict, seq_counts: pd.DataFrame,
a_prob, t_prob, g_prob, c_prob): mean_length: int, std: int,
a_prob: float, t_prob: float, g_prob: float, c_prob: float
) -> list:
""" """
Fragment cDNA sequences and select terminal fragment. Fragment cDNA sequences and select terminal fragment.
Args: Args:
fasta_file (fasta): FASTA file with cDNA sequences fasta_file (dict): dictionary of {transcript IDs: sequences}
counts_file (text): CSV or TSV file woth sequence counts counts_file (pd.DataFrame): dataframe with sequence counts and IDs
mean_length (int): mean length of desired fragments mean_length (int): mean length of desired fragments
std (int): standard deviation of desired fragment lengths std (int): standard deviation of desired fragment lengths
a_prob (float): probability of nucleotide A a_prob (float): probability of nucleotide A
...@@ -23,9 +25,6 @@ def fragmentation(fasta, counts_file, mean_length, std, ...@@ -23,9 +25,6 @@ def fragmentation(fasta, counts_file, mean_length, std,
Returns: Returns:
list: list of selected terminal fragments list: list of selected terminal fragments
""" """
seq_counts = pd.read_csv(counts_file,
names=["seqID", "count"])
# calculated using https://www.nature.com/articles/srep04532#MOESM1 # calculated using https://www.nature.com/articles/srep04532#MOESM1
nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob} nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob}
......
"""Receive command line arguments, fragment, and output fragments.""" """Receive command line arguments, fragment sequences, and output fragments."""
import argparse import argparse
import logging import logging
from Bio import SeqIO from Bio import SeqIO
import numpy as np import numpy as np
import pandas as pd
from pathlib import Path
from fragmentation import fragmentation from fragmentation import fragmentation
from utils import check_positive, extant_file, check_prob from utils import check_positive, check_prob
def main(args): def main(args: argparse.Namespace):
""" """
Use CLI arguments to fragment sequences and output text file \ Use CLI arguments to fragment sequences and output text file \
with selected terminal fragments. with selected terminal fragments.
...@@ -16,37 +18,73 @@ def main(args): ...@@ -16,37 +18,73 @@ def main(args):
Args: Args:
args (parser): list of arguments from CLI. args (parser): list of arguments from CLI.
""" """
fasta, seq_counts, output, mean_length, std = args[0:5] fasta_file, counts_file, output, mean_length, std = args[0:5]
a_prob, t_prob, g_prob, c_prob, chunk_size = args[5:] a_prob, t_prob, g_prob, c_prob, chunk_size, sep = args[5:]
# Create or wipe output file # Create or wipe output file
open(output, "w").close() open(output, "w").close()
logger.info(f"Fragmentation of {fasta}") logger.info("Checking validity of files...")
fasta, seq_counts = file_validation(fasta_file, counts_file, sep)
logger.info(f"Fragmentation of {fasta_file}...")
fasta_parse = {} fasta_parse = {}
for record in SeqIO.parse(fasta, "fasta"): for record in fasta:
fasta_parse[record.id] = record.seq fasta_parse[record.id] = record.seq
splits = np.arange(0, len(list(fasta_parse))+chunk_size, chunk_size) splits = np.arange(0, len(list(fasta_parse))+chunk_size, chunk_size)
for i, split in enumerate(splits): for i, split in enumerate(splits):
fasta_dict = fasta_parse[split:splits[i+1]] fasta_dict = fasta_parse[split:splits[i+1]]
term_frags = fragmentation(fasta_dict, seq_counts, output, term_frags = fragmentation(fasta_dict, seq_counts,
mean_length, std, mean_length, std,
a_prob, t_prob, g_prob, c_prob) a_prob, t_prob, g_prob, c_prob)
logger.info(f"Writing batch {i} sequences to {output}") logger.info(f"Writing batch {i} sequences to {output}...")
with open(output, 'a') as f: with open(output, 'a') as f:
for line in term_frags: for line in term_frags:
f.write(f"{line}\n") f.write(f"{line}\n")
def parse_arguments(): def file_validation(fasta_file: str,
counts_file: str,
sep: str) -> tuple[dict, pd.DataFrame]:
"""
Validate input files exist and are the correct format.
Args:
fasta_file (str): Input FASTA file path
counts_file (str): CSV or TSV counts file path
sep (str): Separator for counts file.
Returns:
tuple: fasta and sequence counts variables
"""
with open(fasta_file, "r") as handle:
fasta = SeqIO.parse(handle, "fasta")
try:
any(fasta)
except Exception:
logger.exception("Input FASTA file is either empty or \
incorrect file type.")
count_path = Path(counts_file)
if not count_path.is_file():
logger.exception("Input counts file does not exist or isn't a file.")
else:
if sep == ",":
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"])
else:
seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
return fasta, seq_counts
def parse_arguments() -> argparse.Namespace:
""" """
Request parameters from user on CLI. Request parameters from user on CLI.
Returns: Returns:
parser: list of arguments from CLI. argparse.Namespace: object of arguments from CLI.
""" """
parser = argparse.ArgumentParser(description="""Takes as input FASTA file parser = argparse.ArgumentParser(description="""Takes as input FASTA file
of cDNA sequences, a CSV/TSV with sequence of cDNA sequences, a CSV/TSV with sequence
...@@ -56,11 +94,11 @@ def parse_arguments(): ...@@ -56,11 +94,11 @@ def parse_arguments():
fragment (within desired length range) fragment (within desired length range)
for each sequence.""") for each sequence.""")
parser.add_argument('--fasta', required=True, type=extant_file, parser.add_argument('--fasta', required=True,
help="Path to FASTA file with cDNA sequences") help="Path to FASTA file with cDNA sequences")
parser.add_argument('--counts', required=True, type=extant_file, parser.add_argument('--counts', required=True,
help="Path to CSV/TSV file with sequence counts") help="Path to CSV/TSV file with sequence counts")
parser.add_argument('-o', '--output', required=True, type=extant_file, parser.add_argument('-o', '--output', required=True,
help="output file path") help="output file path")
parser.add_argument('--mean', required=False, default=300, parser.add_argument('--mean', required=False, default=300,
type=check_positive, type=check_positive,
...@@ -84,6 +122,9 @@ def parse_arguments(): ...@@ -84,6 +122,9 @@ def parse_arguments():
parser.add_argument('-s', '--size', required=False, default=10000, parser.add_argument('-s', '--size', required=False, default=10000,
type=check_positive, type=check_positive,
help="Chunk size for batch processing") help="Chunk size for batch processing")
parser.add_argument('--sep', required=False, default=",",
type=check_positive,
help="Sequence counts file separator.")
args = parser.parse_args() args = parser.parse_args()
return args return args
......
"""Utility functions for command line arguments.""" """Utility functions for CLI arguments."""
import argparse import argparse
import os.path
# found on shorturl.at/vzAX4 def check_positive(value: str) -> int:
def extant_file(x):
if not os.path.exists(x):
# Argparse uses the ArgumentTypeError to give a rejection message like:
# error: argument input: x does not exist
raise argparse.ArgumentTypeError("{0} does not exist".format(x))
elif not x.endswith((".fasta", ".fa", ".csv")):
raise argparse.ArgumentTypeError("""{0} is not the correct
file format""".format(x))
return x
def check_positive(value):
"""Check input value is a positive integer. """Check input value is a positive integer.
Args: Args:
value (string): command line parameter value (str): command line parameter
Raises: Raises:
argparse.ArgumentTypeError: received a negative integer argparse.ArgumentTypeError: received a negative integer
...@@ -40,12 +27,12 @@ def check_positive(value): ...@@ -40,12 +27,12 @@ def check_positive(value):
return ivalue return ivalue
def check_prob(value): def check_prob(value: str) -> float:
""" """
Check probability value is within ]0,1] range. Check probability value is within ]0,1] range.
Args: Args:
value (string): command line parameter value (str): command line parameter
Raises: Raises:
argparse.ArgumentTypeError: received a value outside valid range argparse.ArgumentTypeError: received a value outside valid range
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment