Skip to content
Snippets Groups Projects
Commit ea5026a4 authored by Hugo Madge Leon's avatar Hugo Madge Leon
Browse files

Merged Tanya's files + final lint check

parent b8924ea1
No related branches found
No related tags found
1 merge request!46Merged Tanya's files + final lint check
Pipeline #14874 failed
...@@ -60,4 +60,5 @@ __pycache__/ ...@@ -60,4 +60,5 @@ __pycache__/
*_cache *_cache
*egg-info/ *egg-info/
.coverage .coverage
build/ build/
\ No newline at end of file */play.py
\ No newline at end of file
...@@ -10,6 +10,13 @@ import pandas as pd # type: ignore ...@@ -10,6 +10,13 @@ import pandas as pd # type: ignore
from term_frag_sel.fragmentation import fragmentation from term_frag_sel.fragmentation import fragmentation
from term_frag_sel.utils import check_positive, check_prob from term_frag_sel.utils import check_positive, check_prob
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s \
(module "%(module)s")',
level=logging.INFO,
)
logger = logging.getLogger("main")
def main(args: argparse.Namespace): def main(args: argparse.Namespace):
"""Use CLI arguments to fragment sequences and output text file \ """Use CLI arguments to fragment sequences and output text file \
...@@ -18,6 +25,9 @@ def main(args: argparse.Namespace): ...@@ -18,6 +25,9 @@ def main(args: argparse.Namespace):
Args: Args:
args (parser): list of arguments from CLI. args (parser): list of arguments from CLI.
""" """
if not isinstance(args, argparse.Namespace):
raise TypeError("Input should be argparse.Namespace")
# Create or wipe output file # Create or wipe output file
with open(args.output, "w", encoding="utf-8") as _: with open(args.output, "w", encoding="utf-8") as _:
pass pass
...@@ -26,17 +36,16 @@ def main(args: argparse.Namespace): ...@@ -26,17 +36,16 @@ def main(args: argparse.Namespace):
fasta, seq_counts = file_validation(args.fasta, args.counts, args.sep) fasta, seq_counts = file_validation(args.fasta, args.counts, args.sep)
logger.info("Fragmentation of %s...", args.fasta) logger.info("Fragmentation of %s...", args.fasta)
fasta_parse = {} splits = np.arange(0, len(list(fasta))+args.size, args.size)
for record in fasta:
fasta_parse[record.id] = record.seq
splits = np.arange(0, len(list(fasta_parse))+args.size, args.size)
nuc_probs = {'A': args.a_prob, 'T': args.t_prob,
'G': args.g_prob, 'C': args.c_prob}
for i, split in enumerate(splits): for i, split in enumerate(splits):
fasta_dict = fasta_parse[split:splits[i+1]] fasta_dict = fasta[split:splits[i+1]]
term_frags = fragmentation(fasta_dict, seq_counts, term_frags = fragmentation(fasta_dict, seq_counts,
nuc_probs,
args.mean, args.std, args.mean, args.std,
args.A_prob, args.T_prob, )
args.G_prob, args.C_prob)
logger.info("Writing batch %s sequences to %s...", i, args.output) logger.info("Writing batch %s sequences to %s...", i, args.output)
with open(args.output, 'a', encoding="utf-8") as out_file: with open(args.output, 'a', encoding="utf-8") as out_file:
...@@ -55,24 +64,32 @@ def file_validation(fasta_file: str, ...@@ -55,24 +64,32 @@ def file_validation(fasta_file: str,
sep (str): Separator for counts file. sep (str): Separator for counts file.
Returns: Returns:
tuple: fasta and sequence counts variables tuple: fasta dict and sequence counts pd.DataFrame
""" """
fasta_dict = {}
with open(fasta_file, "r", encoding="utf-8") as handle: with open(fasta_file, "r", encoding="utf-8") as handle:
fasta = SeqIO.parse(handle, "fasta") fasta_sequences = SeqIO.parse(handle, "fasta")
if not any(fasta):
raise ValueError("Input FASTA file is either empty or \ if not any(fasta_sequences):
incorrect file type.") raise ValueError("Input FASTA file is either empty or \
incorrect file type.")
for record in fasta_sequences:
fasta_dict[record.id] = str(record.seq).upper()
count_path = Path(counts_file) count_path = Path(counts_file)
if not count_path.is_file(): if not count_path.is_file():
logger.exception("Input counts file does not exist or isn't a file.") raise FileNotFoundError("Input counts file does not exist or \
isn't a file.")
if sep == ",":
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"])
seq_counts = seq_counts.astype({"seqID": str})
else: else:
if sep == ",": seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
seq_counts = pd.read_csv(counts_file, names=["seqID", "count"]) seq_counts = seq_counts.astype({"seqID": str})
else:
seq_counts = pd.read_table(counts_file, names=["seqID", "count"])
return fasta, seq_counts return fasta_dict, seq_counts
def parse_arguments() -> argparse.Namespace: def parse_arguments() -> argparse.Namespace:
...@@ -97,21 +114,21 @@ def parse_arguments() -> argparse.Namespace: ...@@ -97,21 +114,21 @@ def parse_arguments() -> argparse.Namespace:
help="output file path") help="output file path")
parser.add_argument('--mean', required=False, default=300, parser.add_argument('--mean', required=False, default=300,
type=check_positive, type=check_positive,
help="Mean fragment length (default: 10)") help="Mean fragment length (default: 300)")
parser.add_argument('--std', required=False, default=60, parser.add_argument('--std', required=False, default=60,
type=check_positive, type=check_positive,
help="Standard deviation fragment length \ help="Standard deviation fragment length \
(defafult: 1)") (defafult: 60)")
parser.add_argument('-a', '--A_prob', required=False, default=0.22, parser.add_argument('-a', '--a_prob', required=False, default=0.22,
type=check_prob, type=check_prob,
help="Probability cut happens after nucleotide A") help="Probability cut happens after nucleotide A")
parser.add_argument('-t', '--T_prob', required=False, default=0.25, parser.add_argument('-t', '--t_prob', required=False, default=0.25,
type=check_prob, type=check_prob,
help="Probability cut happens after nucleotide T") help="Probability cut happens after nucleotide T")
parser.add_argument('-g', '--G_prob', required=False, default=0.25, parser.add_argument('-g', '--g_prob', required=False, default=0.25,
type=check_prob, type=check_prob,
help="Probability cut happens after nucleotide G") help="Probability cut happens after nucleotide G")
parser.add_argument('-c', '--C_prob', required=False, default=0.28, parser.add_argument('-c', '--c_prob', required=False, default=0.28,
type=check_prob, type=check_prob,
help="Probability cut happens after nucleotide C") help="Probability cut happens after nucleotide C")
parser.add_argument('-s', '--size', required=False, default=10000, parser.add_argument('-s', '--size', required=False, default=10000,
......
...@@ -5,41 +5,81 @@ import numpy as np ...@@ -5,41 +5,81 @@ import numpy as np
import pandas as pd # type: ignore import pandas as pd # type: ignore
def get_cut_number(seq_len: int, mean: float) -> int:
"""Get the number of cuts for a particular sequence.
Args:
seq_len (int): length of sequence/number of nucleotides in the sequence
mean (float): mean fragment length
Returns:
int: number of cuts
"""
if not isinstance(seq_len, int):
raise ValueError(f"Sequence length must be numeric, \
not {type(seq_len)}")
if not isinstance(mean, int):
raise ValueError(f"Mean must be numeric, not {type(mean)}")
cuts_distribution = [] # distribution of cut numbers (n_cuts)
# 1000 iterations should not be too computationally inefficient
# given the nature of the code
for _ in range(1000):
n_cuts = 0
len_sum = 0
while True:
len_sum += np.random.exponential(scale=mean)
if len_sum < seq_len:
n_cuts += 1
else:
cuts_distribution.append(n_cuts)
break
cuts_distribution.sort()
cut_counts = {x: cuts_distribution.count(x) for x in cuts_distribution}
cut_probs = [x/1000 for x in cut_counts.values()]
# randomly ick no. of cut from cut distribution based on probability
# of cut numbers
n_cuts = np.random.choice(list(cut_counts.keys()), p=cut_probs)
return n_cuts
def fragmentation(fasta: dict, seq_counts: pd.DataFrame, def fragmentation(fasta: dict, seq_counts: pd.DataFrame,
mean_length: int = 100, std: int = 10, nuc_probs: dict,
a_prob: float = 0.22, t_prob: float = 0.25, mu_length: int = 100, std: int = 10,
g_prob: float = 0.25, c_prob: float = 0.28
) -> list: ) -> list:
"""Fragment cDNA sequences and select terminal fragment. """Fragment cDNA sequences and select terminal fragment.
Args: Args:
fasta_file (dict): dictionary of {transcript IDs: sequences} fasta_file (dict): dictionary of {transcript IDs: sequences}
counts_file (pd.DataFrame): dataframe with sequence counts and IDs counts_file (pd.DataFrame): dataframe with sequence counts and IDs
mean_length (int): mean length of desired fragments nuc_probs (dict): probability of cut occuring a certain nucleotide. \
Ordered as A, T, G, C. E.g: {'A': 0.22, 'T': 0.25, \
'G': 0.25, 'C': 0.28}.
mu_length (int): mean length of desired fragments
std (int): standard deviation of desired fragment lengths std (int): standard deviation of desired fragment lengths
a_prob (float): probability of nucleotide A
t_prob (float): probability of nucleotide T
g_prob (float): probability of nucleotide G
c_prob (float): probability of nucleotide C
Returns: Returns:
list: list of selected terminal fragments list: list of selected terminal fragments
""" """
# calculated using https://www.nature.com/articles/srep04532#MOESM1 # calculated using https://www.nature.com/articles/srep04532#MOESM1
nuc_probs = {'A': a_prob, 'T': t_prob, 'G': g_prob, 'C': c_prob}
term_frags = [] term_frags = []
for seq_id, seq in fasta.items(): for seq_id, seq in fasta.items():
counts = seq_counts[seq_counts["seqID"] == seq_id]["count"] counts = seq_counts[seq_counts["seqID"] == seq_id]["count"]
for _ in range(counts): for _ in range(counts.iloc[0]):
n_cuts = int(len(seq)/mean_length) # pick no. of cuts from gauss fragment length distribution
# non-uniformly random DNA fragmentation implementation based on # non-uniformly random DNA fragmentation implementation based on
# https://www.nature.com/articles/srep04532#Sec1 # https://www.nature.com/articles/srep04532#Sec1
# assume fragmentation by sonication for NGS workflow # assume fragmentation by sonication for NGS workflow
cuts = [] cuts = []
cut_nucs = np.random.choice(list(nuc_probs.keys()), cut_nucs = np.random.choice(list(nuc_probs.keys()),
n_cuts, p=list(nuc_probs.values())) size=get_cut_number(len(seq),
mu_length),
p=list(nuc_probs.values()))
for nuc in cut_nucs: for nuc in cut_nucs:
nuc_pos = [x.start() for x in re.finditer(nuc, seq)] nuc_pos = [x.start() for x in re.finditer(nuc, seq)]
pos = np.random.choice(nuc_pos) pos = np.random.choice(nuc_pos)
...@@ -50,13 +90,11 @@ def fragmentation(fasta: dict, seq_counts: pd.DataFrame, ...@@ -50,13 +90,11 @@ def fragmentation(fasta: dict, seq_counts: pd.DataFrame,
cuts.sort() cuts.sort()
cuts.insert(0, 0) cuts.insert(0, 0)
term_frag = "" term_frag = ""
for i, val in enumerate(cuts):
if i == len(cuts)-1: # check if 3' fragment is in the correct size range
fragment = seq[val+1:cuts[-1]] if mu_length-std <= len(seq[cuts[-1]:len(seq)]) <= mu_length+std:
else: term_frag = seq[cuts[-1]:len(seq)]
fragment = seq[val:cuts[i+1]]
if mean_length-std <= len(fragment) <= mean_length+std:
term_frag = fragment
if term_frag != "": if term_frag != "":
term_frags.append(term_frag) term_frags.append(term_frag)
......
"""Test cli.py functions.""" """Test cli.py functions."""
import pytest import pytest
# from Bio import SeqIO
from term_frag_sel.cli import file_validation # type: ignore from term_frag_sel.cli import file_validation, main # type: ignore
FASTA_FILE = "tests/test_files/test.fasta" FASTA_FILE = "tests/test_files/test.fasta"
CSV_FILE = "tests/test_files/test.csv"
TAB_FILE = "tests/test_files/test_tab.csv"
_, csv_counts = file_validation(FASTA_FILE, CSV_FILE, ",")
_, tab_counts = file_validation(FASTA_FILE, TAB_FILE, "\t")
def test_file(): def test_file():
"""Test check_positive function.""" """Test file_validation function."""
assert isinstance(file_validation(FASTA_FILE, CSV_FILE, ","), tuple)
assert isinstance(file_validation(FASTA_FILE, TAB_FILE, "\t"), tuple)
assert csv_counts.equals(tab_counts)
with pytest.raises(FileNotFoundError):
file_validation(FASTA_FILE, "", ",")
with pytest.raises(FileNotFoundError): with pytest.raises(FileNotFoundError):
file_validation("", "", ",") file_validation("", CSV_FILE, ",")
with pytest.raises(ValueError):
file_validation(CSV_FILE, CSV_FILE, ",")
def test_main():
"""Test main() function."""
with pytest.raises(TypeError):
main("")
0,1,99
1,2,130
2,3,144
3,4,52
4,5,98
5,6,85
6,7,119
7,8,143
8,9,112
9,10,87
10,11,53
11,12,50
12,13,129
13,14,112
14,15,56
15,16,82
16,17,130
17,18,98
18,19,87
19,20,75
20,21,140
21,22,141
22,23,74
23,24,54
24,25,56
25,26,56
26,27,56
27,28,99
28,29,101
29,30,101
30,31,62
31,32,96
32,33,131
33,34,117
34,35,53
35,36,81
36,37,114
37,38,106
38,39,67
39,40,121
40,41,134
41,42,105
42,43,91
43,44,90
44,45,145
45,46,59
46,47,84
47,48,62
48,49,50
49,50,86
0 1 99
1 2 130
2 3 144
3 4 52
4 5 98
5 6 85
6 7 119
7 8 143
8 9 112
9 10 87
10 11 53
11 12 50
12 13 129
13 14 112
14 15 56
15 16 82
16 17 130
17 18 98
18 19 87
19 20 75
20 21 140
21 22 141
22 23 74
23 24 54
24 25 56
25 26 56
26 27 56
27 28 99
28 29 101
29 30 101
30 31 62
31 32 96
32 33 131
33 34 117
34 35 53
35 36 81
36 37 114
37 38 106
38 39 67
39 40 121
40 41 134
41 42 105
42 43 91
43 44 90
44 45 145
45 46 59
46 47 84
47 48 62
48 49 50
49 50 86
"""Test utils.py functions.""" """Test utils.py functions."""
import pandas as pd
import pytest import pytest
from Bio import SeqIO from Bio import SeqIO
from term_frag_sel.fragmentation import fragmentation # type: ignore from term_frag_sel.fragmentation import fragmentation, get_cut_number
with open("tests/test_files/test.fasta", "r", encoding="utf-8") as handle: FASTA_FILE = "tests/test_files/test.fasta"
fasta = SeqIO.parse(handle, "fasta") CSV_FILE = "tests/test_files/test.csv"
# have to create the counts file fasta_dict = {}
with open(FASTA_FILE, "r", encoding="utf-8") as handle:
fasta_sequences = SeqIO.parse(handle, "fasta")
for record in fasta_sequences:
fasta_dict[record.id] = str(record.seq).upper()
MU = 100 seq_counts = pd.read_csv(CSV_FILE, names=["seqID", "count"])
STD = 10 seq_counts = seq_counts.astype({"seqID": str})
A_PROB = 0.22
C_PROB = 0.28
T_PROB = 0.25
G_PROB = 0.25
NUC = {'A': 0.22, 'T': 0.25, 'G': 0.25, 'C': 0.28}
def test_frag():
def test_get_cut_number():
"""Test get_cut_number function."""
assert get_cut_number(100, 20) <= 15 and get_cut_number(100, 20) >= 0
with pytest.raises(ValueError):
get_cut_number(seq_len='a', mean=4)
with pytest.raises(ValueError):
get_cut_number(seq_len=10, mean='a')
def test_fragmentation():
"""Test fragmentation function.""" """Test fragmentation function."""
with pytest.raises(TypeError): assert isinstance(fragmentation(fasta_dict, seq_counts,
fragmentation() NUC), list)
# no need to check string mean or std since it's checked at CLI
with pytest.raises(ValueError):
nuc_probs = {'A': 'a', 'T': 0.25,
'G': 0.25, 'C': 0.28}
fragmentation(fasta_dict, seq_counts, nuc_probs)
with pytest.raises(ValueError):
nuc_probs = {'A': 0.22, 'T': 'a',
'G': 0.25, 'C': 0.28}
fragmentation(fasta_dict, seq_counts, nuc_probs)
with pytest.raises(ValueError):
nuc_probs = {'A': 0.22, 'T': 0.25,
'G': 'a', 'C': 0.28}
fragmentation(fasta_dict, seq_counts, nuc_probs)
with pytest.raises(ValueError):
nuc_probs = {'A': 0.22, 'T': 0.25,
'G': 0.25, 'C': 'a'}
fragmentation(fasta_dict, seq_counts, nuc_probs)
"""Test utils.py functions.""" """Test utils.py functions."""
<<<<<<< HEAD
import pytest
from
=======
import argparse import argparse
import pytest import pytest
...@@ -40,4 +35,3 @@ def test_prob(): ...@@ -40,4 +35,3 @@ def test_prob():
check_prob("string") check_prob("string")
with pytest.raises(ValueError): with pytest.raises(ValueError):
check_prob("") check_prob("")
>>>>>>> hugo_new
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment