Skip to content
Snippets Groups Projects
Commit e8f8acc9 authored by Michele Garioni's avatar Michele Garioni
Browse files

feat: add sampling from transcript file

parent fd2973bc
Branches
Tags
1 merge request!7feat: add sampling from transcript file
Pipeline #13574 passed
...@@ -3,3 +3,4 @@ flake8-docstrings ...@@ -3,3 +3,4 @@ flake8-docstrings
pytest pytest
mypy mypy
coverage coverage
pandas
\ No newline at end of file
"""Samples transcripts from input.
Samples a defined number of transcript following
the relative RNA abundance per gene of a given input.
"""
import logging
from pathlib import Path
from random import choices
LOG = logging.getLogger(__name__)
def sample_from_input(
input_file: Path,
output_file: Path = Path.cwd() / 'sampled_cell.csv',
n: int = 10000,
sep: str = ',',
) -> None:
"""Samples transcripts from input.
Samples a defined number of transcript per gene following
the relative RNA abundance per gene of a given input and
writes the simulated results in a csv file.
Args:
input_file (string): name of the input gene expression file.
output_file (string): name of the sampled gene expression file.
n (int): number of total transcripts to be sampled.
sep (str): separator of the input file.
"""
myfile = open(input_file, 'r')
# initialize empty dictionary
input_dc = {}
# read line, split key-value and assign key and value to the
# dictionary after stripping \n character.
LOG.info('reading file...')
for myline in myfile:
gene = myline.split(sep)
input_dc[gene[0].strip()] = int(gene[1].strip())
myfile.close()
LOG.debug(input_dc)
LOG.info('file read.')
# extract count numbers and calculate relative abundance
counts = list(input_dc.values())
tot_counts = sum(counts)
relative_value = [x / tot_counts for x in counts]
# sampling
LOG.info('sampling reads...')
sampled_genes = choices(list(input_dc.keys()), weights=relative_value, k=n)
# initialize empty dictionary
sampled_dc = dict()
# count the genes occurence from the sampled list
for i in sampled_genes:
if i not in sampled_dc:
sampled_dc[i] = 1
else:
sampled_dc[i] += 1
LOG.info('reads sampled.')
# write sample dictionary to a csv file, joining the
# key value pairs with a comma
myfile = open(output_file, 'w')
LOG.info('writing output...')
for (k, v) in sampled_dc.items():
line = ','.join([str(k), str(v)])
myfile.write(line + '\n')
myfile.close()
LOG.info('output written.')
GENE1,92
GENE2,13
GENE3,73
GENE4,83
GENE5,32
GENE6,136
GENE7,36
\ No newline at end of file
GENE1 92
GENE2 13
GENE3 73
GENE4 83
GENE5 32
GENE6 136
GENE7 36
\ No newline at end of file
"""Placeholder test for pipeline.""" """Placeholder test for pipeline."""
import pytest
import src import src
import re import re
from src import sampleinput as si
import pandas as pd
def test_version(): def test_version():
"""Assert that version matches semantic versioning format.""" """Assert that version matches semantic versioning format."""
assert re.match(r'\d\.\d\.\d', src.__version__) assert re.match(r'\d\.\d\.\d', src.__version__)
def test_sampleinput(tmpdir):
"""Tests the output, input file name and separator."""
si.sample_from_input(
input_file='./tests/resources/Transcript2.tsv',
output_file=tmpdir / 'test1.csv',
sep='\t',
n=142958
)
t1=pd.read_table(tmpdir / 'test1.csv', header=None, sep=',')
assert t1[1].sum()==142958
with pytest.raises(IndexError):
si.sample_from_input(input_file='./tests/resources/Transcript2.tsv')
with pytest.raises(IOError):
si.sample_from_input(input_file='file_not_existing.txt')
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment