Skip to content
Snippets Groups Projects
Commit 3f67d787 authored by Hugo Madge Leon's avatar Hugo Madge Leon
Browse files
parents 0d1e73eb ce859cd7
No related branches found
No related tags found
2 merge requests!57Readme more,!56Readme fixes
...@@ -8,7 +8,7 @@ import pandas as pd # type: ignore ...@@ -8,7 +8,7 @@ import pandas as pd # type: ignore
from term_frag_sel.fragmentation import fragmentation from term_frag_sel.fragmentation import fragmentation
from term_frag_sel.utils import check_positive, check_prob from term_frag_sel.utils import check_positive
logging.basicConfig( logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s \ format='[%(asctime)s: %(levelname)s] %(message)s \
...@@ -38,14 +38,10 @@ def main(args: argparse.Namespace): ...@@ -38,14 +38,10 @@ def main(args: argparse.Namespace):
logger.info("Fragmentation of %s...", args.fasta) logger.info("Fragmentation of %s...", args.fasta)
splits = np.arange(0, len(list(fasta))+args.size, args.size) splits = np.arange(0, len(list(fasta))+args.size, args.size)
nuc_probs = {'A': args.a_prob, 'T': args.t_prob,
'G': args.g_prob, 'C': args.c_prob}
for i, split in enumerate(splits): for i, split in enumerate(splits):
fasta_dict = fasta[split:splits[i+1]] fasta_dict = fasta[split:splits[i+1]]
term_frags = fragmentation(fasta_dict, seq_counts, term_frags = fragmentation(fasta_dict, seq_counts,
nuc_probs, args.mean, args.std)
args.mean, args.std,
)
logger.info("Writing batch %s sequences to %s...", i, args.output) logger.info("Writing batch %s sequences to %s...", i, args.output)
with open(args.output, 'a', encoding="utf-8") as out_file: with open(args.output, 'a', encoding="utf-8") as out_file:
...@@ -119,18 +115,6 @@ def parse_arguments() -> argparse.Namespace: ...@@ -119,18 +115,6 @@ def parse_arguments() -> argparse.Namespace:
type=check_positive, type=check_positive,
help="Standard deviation fragment length \ help="Standard deviation fragment length \
(defafult: 60)") (defafult: 60)")
parser.add_argument('-a', '--a_prob', required=False, default=0.22,
type=check_prob,
help="Probability cut happens after nucleotide A")
parser.add_argument('-t', '--t_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide T")
parser.add_argument('-g', '--g_prob', required=False, default=0.25,
type=check_prob,
help="Probability cut happens after nucleotide G")
parser.add_argument('-c', '--c_prob', required=False, default=0.28,
type=check_prob,
help="Probability cut happens after nucleotide C")
parser.add_argument('-s', '--size', required=False, default=10000, parser.add_argument('-s', '--size', required=False, default=10000,
type=check_positive, type=check_positive,
help="Chunk size for batch processing") help="Chunk size for batch processing")
......
"""Fragment sequences.""" """Fragment sequences."""
import re import random
import numpy as np
import pandas as pd # type: ignore import pandas as pd # type: ignore
def get_cut_number(seq_len: int, mean: float) -> int:
"""Get the number of cuts for a particular sequence.
Args:
seq_len (int): length of sequence/number of nucleotides in the sequence
mean (float): mean fragment length
Returns:
int: number of cuts
"""
if not isinstance(seq_len, int):
raise ValueError(f"Sequence length must be numeric, \
not {type(seq_len)}")
if not isinstance(mean, int):
raise ValueError(f"Mean must be numeric, not {type(mean)}")
cuts_distribution = [] # distribution of cut numbers (n_cuts)
# 1000 iterations should not be too computationally inefficient
# given the nature of the code
for _ in range(1000):
n_cuts = 0
len_sum = 0
while True:
len_sum += np.random.exponential(scale=mean)
if len_sum < seq_len:
n_cuts += 1
else:
cuts_distribution.append(n_cuts)
break
cuts_distribution.sort()
cut_counts = {x: cuts_distribution.count(x) for x in cuts_distribution}
cut_probs = [x/1000 for x in cut_counts.values()]
# randomly ick no. of cut from cut distribution based on probability
# of cut numbers
n_cuts = np.random.choice(list(cut_counts.keys()), p=cut_probs)
return n_cuts
def fragmentation(fasta: dict, seq_counts: pd.DataFrame, def fragmentation(fasta: dict, seq_counts: pd.DataFrame,
nuc_probs: dict, mean: int = 100, std: int = 10) -> list:
mu_length: int = 100, std: int = 10,
) -> list:
"""Fragment cDNA sequences and select terminal fragment. """Fragment cDNA sequences and select terminal fragment.
Args: Args:
fasta_file (dict): dictionary of {transcript IDs: sequences} fasta_file (dict): dictionary of {transcript IDs: sequences}
counts_file (pd.DataFrame): dataframe with sequence counts and IDs counts_file (pd.DataFrame): dataframe with sequence counts and IDs
nuc_probs (dict): probability of cut occuring a certain nucleotide. \ mean (int): mean length of desired fragments
Ordered as A, T, G, C. E.g: {'A': 0.22, 'T': 0.25, \
'G': 0.25, 'C': 0.28}.
mu_length (int): mean length of desired fragments
std (int): standard deviation of desired fragment lengths std (int): standard deviation of desired fragment lengths
Returns: Returns:
list: list of selected terminal fragments list: list of selected terminal fragments
""" """
# calculated using https://www.nature.com/articles/srep04532#MOESM1 if not isinstance(mean, int):
raise ValueError(f"Mean must be numeric, not {type(mean)}")
if not isinstance(std, int):
raise ValueError(f"Std must be numeric, not {type(mean)}")
term_frags = [] term_frags = []
for seq_id, seq in fasta.items(): for seq_id, seq in fasta.items():
counts = seq_counts[seq_counts["seqID"] == seq_id]["count"] counts = seq_counts[seq_counts["seqID"] == seq_id]["count"]
for _ in range(counts.iloc[0]): for _ in range(counts.iloc[0]):
# pick no. of cuts from gauss fragment length distribution
# non-uniformly random DNA fragmentation implementation based on
# https://www.nature.com/articles/srep04532#Sec1
# assume fragmentation by sonication for NGS workflow
cuts = [] cuts = []
cut_nucs = np.random.choice(list(nuc_probs.keys()), seq_len = len(seq)
size=get_cut_number(len(seq), prob_cut_per_base = 1/mean
mu_length),
p=list(nuc_probs.values())) for i in range(seq_len):
for nuc in cut_nucs: rand_prob = random.uniform(0, 1)
nuc_pos = [x.start() for x in re.finditer(nuc, seq)] if rand_prob < prob_cut_per_base:
pos = np.random.choice(nuc_pos) cuts.append(i)
while pos in cuts:
pos = np.random.choice(nuc_pos)
cuts.append(pos)
cuts.sort() cuts.sort()
cuts.insert(0, 0) cuts.insert(0, 0)
term_frag = "" term_frag = ""
# check if 3' fragment is in the correct size range # check if 3' fragment is in the correct size range
if mu_length-std <= len(seq[cuts[-1]:len(seq)]) <= mu_length+std: if mean - std <= len(seq[cuts[-1]:len(seq)]) <= mean + std:
term_frag = seq[cuts[-1]:len(seq)] term_frag = seq[cuts[-1]:len(seq)]
if term_frag != "": if term_frag != "":
......
...@@ -25,22 +25,3 @@ def check_positive(value: str) -> int: ...@@ -25,22 +25,3 @@ def check_positive(value: str) -> int:
got: {value}""") from exc got: {value}""") from exc
else: else:
return ivalue return ivalue
def check_prob(value: str) -> float:
"""Check probability value is within ]0,1] range.
Args:
value (str): command line parameter
Raises:
argparse.ArgumentTypeError: received a value outside valid range
Returns:
float: float version of input value
"""
pvalue = float(value)
if pvalue <= 0 or pvalue > 1:
raise argparse.ArgumentTypeError("""Expected a positive float between
0 and 1, but got {value}""")
return pvalue
...@@ -3,7 +3,7 @@ import pandas as pd ...@@ -3,7 +3,7 @@ import pandas as pd
import pytest import pytest
from Bio import SeqIO from Bio import SeqIO
from term_frag_sel.fragmentation import fragmentation, get_cut_number from term_frag_sel.fragmentation import fragmentation
FASTA_FILE = "tests/test_files/test.fasta" FASTA_FILE = "tests/test_files/test.fasta"
CSV_FILE = "tests/test_files/test.csv" CSV_FILE = "tests/test_files/test.csv"
...@@ -17,38 +17,13 @@ with open(FASTA_FILE, "r", encoding="utf-8") as handle: ...@@ -17,38 +17,13 @@ with open(FASTA_FILE, "r", encoding="utf-8") as handle:
seq_counts = pd.read_csv(CSV_FILE, names=["seqID", "count"]) seq_counts = pd.read_csv(CSV_FILE, names=["seqID", "count"])
seq_counts = seq_counts.astype({"seqID": str}) seq_counts = seq_counts.astype({"seqID": str})
NUC = {'A': 0.22, 'T': 0.25, 'G': 0.25, 'C': 0.28}
def test_get_cut_number():
"""Test get_cut_number function."""
assert get_cut_number(100, 20) <= 15 and get_cut_number(100, 20) >= 0
with pytest.raises(ValueError):
get_cut_number(seq_len='a', mean=4)
with pytest.raises(ValueError):
get_cut_number(seq_len=10, mean='a')
def test_fragmentation(): def test_fragmentation():
"""Test fragmentation function.""" """Test fragmentation function."""
assert isinstance(fragmentation(fasta_dict, seq_counts, assert isinstance(fragmentation(fasta_dict, seq_counts), list)
NUC), list)
# no need to check string mean or std since it's checked at CLI
with pytest.raises(ValueError):
nuc_probs = {'A': 'a', 'T': 0.25,
'G': 0.25, 'C': 0.28}
fragmentation(fasta_dict, seq_counts, nuc_probs)
with pytest.raises(ValueError): with pytest.raises(ValueError):
nuc_probs = {'A': 0.22, 'T': 'a', fragmentation(fasta_dict, seq_counts, mean='a')
'G': 0.25, 'C': 0.28}
fragmentation(fasta_dict, seq_counts, nuc_probs)
with pytest.raises(ValueError):
nuc_probs = {'A': 0.22, 'T': 0.25,
'G': 'a', 'C': 0.28}
fragmentation(fasta_dict, seq_counts, nuc_probs)
with pytest.raises(ValueError): with pytest.raises(ValueError):
nuc_probs = {'A': 0.22, 'T': 0.25, fragmentation(fasta_dict, seq_counts, std='a')
'G': 0.25, 'C': 'a'}
fragmentation(fasta_dict, seq_counts, nuc_probs)
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
import argparse import argparse
import pytest import pytest
from term_frag_sel.utils import check_positive, check_prob # type: ignore from term_frag_sel.utils import check_positive # type: ignore
def test_positive(): def test_positive():
...@@ -19,19 +19,3 @@ def test_positive(): ...@@ -19,19 +19,3 @@ def test_positive():
check_positive("string") check_positive("string")
with pytest.raises(argparse.ArgumentTypeError): with pytest.raises(argparse.ArgumentTypeError):
check_positive("") check_positive("")
def test_prob():
"""Test check_prob function."""
assert check_prob("0.1") == 0.1
assert check_prob("1") == 1.0
with pytest.raises(argparse.ArgumentTypeError):
check_prob("0")
with pytest.raises(argparse.ArgumentTypeError):
check_prob("10")
with pytest.raises(argparse.ArgumentTypeError):
check_prob("-1")
with pytest.raises(ValueError):
check_prob("string")
with pytest.raises(ValueError):
check_prob("")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment