Skip to content
Snippets Groups Projects

Fixed CLI argument issue

Merged Hugo Madge Leon requested to merge hugo100 into main
8 files
+ 268
69
Compare changes
  • Side-by-side
  • Inline
Files
8
+ 41
24
@@ -10,6 +10,13 @@ import pandas as pd # type: ignore
from term_frag_sel.fragmentation import fragmentation
from term_frag_sel.utils import check_positive, check_prob
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s \
(module "%(module)s")',
level=logging.INFO,
)
logger = logging.getLogger("main")
def main(args: argparse.Namespace):
"""Use CLI arguments to fragment sequences and output text file \
@@ -18,6 +25,9 @@ def main(args: argparse.Namespace):
Args:
args (parser): list of arguments from CLI.
"""
if not isinstance(args, argparse.Namespace):
raise TypeError("Input should be argparse.Namespace")
# Create or wipe output file
with open(args.output, "w", encoding="utf-8") as _:
pass
@@ -26,17 +36,16 @@ def main(args: argparse.Namespace):
fasta, seq_counts = file_validation(args.fasta, args.counts, args.sep)
logger.info("Fragmentation of %s...", args.fasta)
fasta_parse = {}
for record in fasta:
fasta_parse[record.id] = record.seq
splits = np.arange(0, len(list(fasta_parse))+args.size, args.size)
splits = np.arange(0, len(list(fasta))+args.size, args.size)
nuc_probs = {'A': args.a_prob, 'T': args.t_prob,
'G': args.g_prob, 'C': args.c_prob}
for i, split in enumerate(splits):
fasta_dict = fasta_parse[split:splits[i+1]]
fasta_dict = fasta[split:splits[i+1]]
term_frags = fragmentation(fasta_dict, seq_counts,
nuc_probs,
args.mean, args.std,
args.A_prob, args.T_prob,
args.G_prob, args.C_prob)
)
logger.info("Writing batch %s sequences to %s...", i, args.output)
with open(args.output, 'a', encoding="utf-8") as out_file:
@@ -55,24 +64,32 @@ def file_validation(fasta_file: str,
sep (str): Separator for counts file.
Returns:
tuple: fasta and sequence counts variables
tuple: fasta dict and sequence counts pd.DataFrame
"""
fasta_dict = {}
with open(fasta_file, "r", encoding="utf-8") as handle:
fasta = SeqIO.parse(handle, "fasta")
if not any(fasta):
raise ValueError("Input FASTA file is either empty or \
incorrect file type.")
fasta_sequences = SeqIO.parse(handle, "fasta")
if not any(fasta_sequences):
raise ValueError("Input FASTA file is either empty or \
incorrect file type.")
for record in fasta_sequences:
fasta_dict[record.id] = str(record.seq).upper()
count_path = Path(counts_file)
if not count_path.is_file():
logger.exception("Input counts file does not exist or isn't a file.")
raise FileNotFoundError("Input counts file does not exist or \
isn't a file.")
if sep == ",":
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"])
seq_counts = seq_counts.astype({"seqID": str})
else:
if sep == ",":
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"])
else:
seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
seq_counts = seq_counts.astype({"seqID": str})
return fasta, seq_counts
return fasta_dict, seq_counts
def parse_arguments() -> argparse.Namespace:
@@ -97,21 +114,21 @@ def parse_arguments() -> argparse.Namespace:
help="output file path")
parser.add_argument('--mean', required=False, default=300,
type=check_positive,
help="Mean fragment length (default: 10)")
help="Mean fragment length (default: 300)")
parser.add_argument('--std', required=False, default=60,
type=check_positive,
help="Standard deviation fragment length \
(defafult: 1)")
parser.add_argument('-a', '--A_prob', required=False, default=0.22,
(defafult: 60)")
parser.add_argument('-a', '--a_prob', required=False, default=0.22,
type=check_prob,
help="Probability cut happens after nucleotide A")
parser.add_argument('-t', '--T_prob', required=False, default=0.25,
parser.add_argument('-t', '--t_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide T")
parser.add_argument('-g', '--G_prob', required=False, default=0.25,
parser.add_argument('-g', '--g_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide G")
parser.add_argument('-c', '--C_prob', required=False, default=0.28,
parser.add_argument('-c', '--c_prob', required=False, default=0.28,
type=check_prob,
help="Probability cut happens after nucleotide C")
parser.add_argument('-s', '--size', required=False, default=10000,
Loading