Skip to content
Snippets Groups Projects
Commit d686c170 authored by Mate Balajti's avatar Mate Balajti
Browse files

Merge branch 'dev' into 'main'

refactor: update tests, CI, main script

See merge request !40
parents 586ebf6e 7b1148fc
Branches
Tags
1 merge request!40refactor: update tests, CI, main script
Pipeline #17404 passed
...@@ -6,7 +6,6 @@ default: # Set default ...@@ -6,7 +6,6 @@ default: # Set default
stages: # List of stages for jobs, and their order of execution stages: # List of stages for jobs, and their order of execution
- build - build
- test - test
- deploy
build-job: # This job runs in the build stage, which runs first. build-job: # This job runs in the build stage, which runs first.
stage: build stage: build
...@@ -30,12 +29,6 @@ lint-test-job: # This job also runs in the test stage. ...@@ -30,12 +29,6 @@ lint-test-job: # This job also runs in the test stage.
- pip install -r requirements.txt - pip install -r requirements.txt
- pip install -r requirements-dev.txt - pip install -r requirements-dev.txt
- pip install -e . - pip install -e .
- flake8 --docstring-convention google --max-line-length 120 readsequencer/ --ignore=D212,D103,D104,D107,D100,D017,D415 - flake8 --docstring-convention google readsequencer/ tests/
#- pylint readsequencer/ tests/ - pylint readsequencer/ tests/
- mypy readsequencer/ tests/
deploy-job: # This job runs in the deploy stage. \ No newline at end of file
stage: deploy # It only runs when *both* jobs in the test stage complete successfully.
environment: production
script:
- echo "Deploying application..."
- echo "Application successfully deployed."
FROM python:3.11.1-slim-bullseye
# MAINTAINER add_maintainer
MAINTAINER "Christoph Harmel"
# set names for user, group and user home
ARG USER="bioc"
ARG GROUP="bioc"
ARG WORKDIR="/home/${USER}"
# create user, group, user home & makes the user and group
# the owner of the user home
RUN mkdir -p $WORKDIR \
&& groupadd -r $GROUP \
&& useradd --no-log-init -r -g $GROUP $USER \
&& chown -R ${USER}:${GROUP} $WORKDIR \
&& chmod 700 $WORKDIR
# set the user, make sure the location where pip
# installs binaries/executables is part of the $PATH
# and set the working directory to the user's home
USER $USER
ENV PATH="${WORKDIR}/.local/bin:${PATH}"
WORKDIR $WORKDIR
# copy entire content of the current directory to the
# working directory; ensure that this does not contain any
# files that you don't want to have end up in the Docker
# image, especially not any secrets!
# use `.dockerignore` to set files that you do NOT want
# Docker to copy
COPY --chown=${USER}:${GROUP} . $WORKDIR
# install app and development dependencies
# assumes that dependencies in `requirements.txt` are
# automatically installed when installing the app; if
# that is not the case, they need to be installed
# _before_ installing the app
RUN pip install . \
&& pip install --no-cache-dir -r requirements-dev.txt
# set default command; optional
CMD ["readsequencer"]
"""Initialise read-sequencer."""
"""Receive command line arguments."""
import argparse import argparse
import logging import logging
from readsequencer.read_sequencer import ReadSequencer from readsequencer.read_sequencer import ReadSequencer
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s \
(module "%(module)s")',
level=logging.INFO,
)
logger = logging.getLogger(__name__)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def main(): def main():
"""Use CLI arguments to simulate sequencing."""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="readsequencer", prog="readsequencer",
description="Simulates sequencing of DNA sequences specified by an FASTA file.", description="Simulates sequencing of DNA sequences specified \
by an FASTA file.",
)
parser.add_argument(
"output",
help="path to FASTA file"
)
parser.add_argument(
"-i",
"--input",
default=None,
help="path to FASTA file"
) )
parser.add_argument("output", help="path to FASTA file")
parser.add_argument("-i", "--input", default=None, help="path to FASTA file")
parser.add_argument( parser.add_argument(
"-r", "--read-length", default=100, help="read length for sequencing", type=int "-r",
"--read-length",
type=int,
default=100,
help="read length for sequencing",
) )
parser.add_argument( parser.add_argument(
"-n", "-n",
"--n_random", "--n_random",
default=100, default=100,
type=int, type=int,
help="n random sequences. Just used if input fasta file is not specified.", help="n random sequences. Just used if input"
"fasta file is not specified.",
) )
parser.add_argument( parser.add_argument(
"-s", "-s",
...@@ -30,7 +52,6 @@ def main(): ...@@ -30,7 +52,6 @@ def main():
type=int, type=int,
help="chunk_size for batch processing", help="chunk_size for batch processing",
) )
args = parser.parse_args() args = parser.parse_args()
LOG.info("Read sequencer started.") LOG.info("Read sequencer started.")
if args.input is not None: if args.input is not None:
...@@ -55,10 +76,5 @@ def main(): ...@@ -55,10 +76,5 @@ def main():
LOG.info("Read sequencer finished.") LOG.info("Read sequencer finished.")
if __name__ == "__main__": if __name__ == '__main__':
logging.basicConfig(
format='[%(asctime)s: %(levelname)s] %(message)s (module "%(module)s")',
level=logging.INFO,
)
LOG = logging.getLogger(__name__)
main() main()
"""Main module for read sequencer."""
from random import choices from random import choices
from collections.abc import Generator, Iterator from collections.abc import Generator, Iterator
from Bio import SeqIO from Bio import SeqIO # type: ignore
from Bio.Seq import Seq from Bio.Seq import Seq # type: ignore
from Bio.SeqRecord import SeqRecord from Bio.SeqRecord import SeqRecord # type: ignore
class ReadSequencer: class ReadSequencer:
"""ReadSequencer class """ReadSequencer class.
Args: Args:
fasta: path fasta file fasta: path fasta file
...@@ -22,23 +23,22 @@ class ReadSequencer: ...@@ -22,23 +23,22 @@ class ReadSequencer:
def __init__( def __init__(
self, self,
fasta: str = None, fasta=None,
output: str = None, output=None,
read_length: int = 150, read_length: int = 150,
chunk_size: int = 10000, chunk_size: int = 10000,
) -> None: ) -> None:
"""Initialise class."""
self.fasta = fasta self.fasta = fasta
self.output = output self.output = output
self.read_length = read_length self.read_length = read_length
self.chunk_size = chunk_size self.chunk_size = chunk_size
self.random = False self.random = False
self.bases = ("A", "T", "C", "G") self.bases = ("A", "T", "C", "G")
self.n_sequences = None self.n_sequences: int
def get_n_sequences(self) -> None: def get_n_sequences(self) -> None:
""" """Detect number of sequences present in set fasta file.
Helper function to detect number of sequences present in set fasta file.
Returns: Returns:
None None
...@@ -46,8 +46,7 @@ class ReadSequencer: ...@@ -46,8 +46,7 @@ class ReadSequencer:
self.n_sequences = len(list(SeqIO.parse(self.fasta, "fasta"))) self.n_sequences = len(list(SeqIO.parse(self.fasta, "fasta")))
def define_random_sequences(self, n_seq: int) -> None: def define_random_sequences(self, n_seq: int) -> None:
""" """Define random sequences.
Defines random sequences.
Args: Args:
n_seq: number of random sequences to be generated n_seq: number of random sequences to be generated
...@@ -59,8 +58,7 @@ class ReadSequencer: ...@@ -59,8 +58,7 @@ class ReadSequencer:
self.n_sequences = n_seq self.n_sequences = n_seq
def generate_random_sequence(self, length: int) -> Seq: def generate_random_sequence(self, length: int) -> Seq:
""" """Generate random sequence.
Generates random sequence.
Args: Args:
length: length of sequence length: length of sequence
...@@ -73,7 +71,7 @@ class ReadSequencer: ...@@ -73,7 +71,7 @@ class ReadSequencer:
return seq return seq
def resize_sequence(self, record: SeqRecord) -> SeqRecord: def resize_sequence(self, record: SeqRecord) -> SeqRecord:
"""Resizes sequence """Resize sequence.
Resizes sequence according to set read length. If sequence is Resizes sequence according to set read length. If sequence is
shorter than read length, fills up with random nucleotides. shorter than read length, fills up with random nucleotides.
...@@ -93,7 +91,7 @@ class ReadSequencer: ...@@ -93,7 +91,7 @@ class ReadSequencer:
return record.seq return record.seq
def batch_iterator(self, iterator: Iterator, batch_size: int) -> Generator: def batch_iterator(self, iterator: Iterator, batch_size: int) -> Generator:
"""Generates batch iterator. """Generate batch iterator.
This is a generator function, and it returns lists of the This is a generator function, and it returns lists of the
entries from the supplied iterator. Each list will have entries from the supplied iterator. Each list will have
...@@ -114,7 +112,7 @@ class ReadSequencer: ...@@ -114,7 +112,7 @@ class ReadSequencer:
batch = [] batch = []
def run_sequencing(self) -> None: def run_sequencing(self) -> None:
"""Runs sequencing. """Run sequencing.
Runs read sequencing of specified sequences from input fasta file or Runs read sequencing of specified sequences from input fasta file or
generates random sequences for a given read length. If number of generates random sequences for a given read length. If number of
...@@ -125,7 +123,7 @@ class ReadSequencer: ...@@ -125,7 +123,7 @@ class ReadSequencer:
""" """
if self.random: if self.random:
if self.n_sequences <= self.chunk_size: if self.n_sequences <= self.chunk_size:
with open(self.output, "w") as output_handle: with open(self.output, "w", encoding="utf-8") as output_handle:
for i in range(self.n_sequences): for i in range(self.n_sequences):
record = SeqRecord( record = SeqRecord(
self.generate_random_sequence(self.read_length), self.generate_random_sequence(self.read_length),
...@@ -134,35 +132,44 @@ class ReadSequencer: ...@@ -134,35 +132,44 @@ class ReadSequencer:
SeqIO.write(record, output_handle, "fasta") SeqIO.write(record, output_handle, "fasta")
else: else:
batch_generator = self.batch_iterator( batch_generator = self.batch_iterator(
range(self.n_sequences), self.chunk_size iter(range(self.n_sequences)), self.chunk_size
) )
for i, batch in enumerate(batch_generator): for i, batch in enumerate(batch_generator):
filename = self.output.replace(".fasta", "") + "_chunk_%i.fasta" % ( filename = (
i + 1 self.output.replace(".fasta", "") +
f"_chunk_{i + 1}.fasta"
) )
with open(filename, "w") as output_handle: with open(
for j, k in enumerate(batch): filename, "w", encoding="utf-8"
) as output_handle:
for j, _ in enumerate(batch):
record = SeqRecord( record = SeqRecord(
self.generate_random_sequence(self.read_length), self.generate_random_sequence(
self.read_length
),
id="random_seq: " + str(j + 1), id="random_seq: " + str(j + 1),
) )
SeqIO.write(record, output_handle, "fasta") SeqIO.write(record, output_handle, "fasta")
else: else:
if self.n_sequences <= self.chunk_size: if self.n_sequences <= self.chunk_size:
with open(self.fasta) as input_handle, open( with open(self.fasta, encoding="utf-8") as input_handle, open(
self.output, "w" self.output, "w", encoding="utf-8"
) as output_handle: ) as output_handle:
for record in SeqIO.parse(input_handle, "fasta"): for record in SeqIO.parse(input_handle, "fasta"):
record.seq = self.resize_sequence(record) record.seq = self.resize_sequence(record)
SeqIO.write(record, output_handle, "fasta") SeqIO.write(record, output_handle, "fasta")
else: else:
record_iter = SeqIO.parse(open(self.fasta), "fasta") with open(self.fasta, encoding="utf-8") as file:
for i, batch in enumerate( record_iter = SeqIO.parse(file, "fasta")
self.batch_iterator(record_iter, self.chunk_size) for i, batch in enumerate(
): self.batch_iterator(record_iter, self.chunk_size)
filename = self.output.replace(".fasta", "") + "_chunk_%i.fasta" % (i + 1) ):
for j, record in enumerate(batch): filename = (
batch[j].seq = self.resize_sequence(record) self.output.replace(".fasta", "") +
with open(filename, "w") as handle: f"_chunk_{i + 1}.fasta"
SeqIO.write(batch, handle, "fasta") )
for j, record in enumerate(batch):
record.seq = self.resize_sequence(record)
with open(filename, "w", encoding="utf-8") as handle:
SeqIO.write(batch, handle, "fasta")
from setuptools import setup, find_packages """Setup tool."""
from pathlib import Path from pathlib import Path
from setuptools import setup, find_packages # type: ignore
project_root_dir = Path(__file__).parent.resolve() project_root_dir = Path(__file__).parent.resolve()
with open(project_root_dir / "requirements.txt", "r", encoding="utf-8") as _file: with open(
project_root_dir / "requirements.txt", "r", encoding="utf-8"
) as _file:
INSTALL_REQUIRES = _file.read().splitlines() INSTALL_REQUIRES = _file.read().splitlines()
URL = ('https://git.scicore.unibas.ch/zavolan_group/'
'tools/read-sequencer')
setup( setup(
name='readsequencer', name='readsequencer',
version='0.1.1', version='0.1.1',
url='https://git.scicore.unibas.ch/zavolan_group/tools/read-sequencer', url=URL,
license='MIT', license='MIT',
author='Clara Serger, Michael Sandholzer and Christoph Harmel', author='Clara Serger, Michael Sandholzer and Christoph Harmel',
author_email='christoph.harmel@unibas.ch', author_email='christoph.harmel@unibas.ch',
description='Simulates sequencing with a specified read length from sequences specified by a FASTA file.', description='Simulates sequencing with a specified read length from'
'sequences specified by a FASTA file.',
packages=find_packages(), packages=find_packages(),
install_requires=INSTALL_REQUIRES, install_requires=INSTALL_REQUIRES,
entry_points={'console_scripts': ['readsequencer=readsequencer.cli:main']} entry_points={
'console_scripts': [
'readsequencer=readsequencer.cli:main'
]
}
) )
"""Initialise testing."""
import readsequencer.cli """Test cli.py."""
import pytest import pytest
from cli_test_helpers import ArgvContext, shell from cli_test_helpers import ArgvContext, shell # type: ignore
import os import readsequencer.cli
import glob
def test_entrypoint(): def test_entrypoint():
""" """Test if entrypoint script is installed (setup.py)."""
Is entrypoint script installed? (setup.py)
"""
result = shell('readsequencer --help') result = shell('readsequencer --help')
assert result.exit_code == 0 assert result.exit_code == 0
def test_usage_no_args(): def test_usage_no_args():
""" """Test if CLI aborts w/o arguments, displaying usage instructions."""
Does CLI abort w/o arguments, displaying usage instructions?
"""
with ArgvContext('readsequencer'), pytest.raises(SystemExit): with ArgvContext('readsequencer'), pytest.raises(SystemExit):
readsequencer.cli.main() readsequencer.cli.main()
......
import pytest """Test read_sequencer.py."""
import os import os
import glob import glob
from readsequencer.read_sequencer import ReadSequencer from readsequencer.read_sequencer import ReadSequencer
def test_init_default(): def test_init_default():
"""Test default initation."""
sequencer = ReadSequencer() sequencer = ReadSequencer()
assert sequencer.fasta is None assert sequencer.fasta is None
assert sequencer.read_length == 150 assert sequencer.read_length == 150
...@@ -13,6 +15,7 @@ def test_init_default(): ...@@ -13,6 +15,7 @@ def test_init_default():
def test_run_random(): def test_run_random():
"""Test random run."""
sequencer = ReadSequencer( sequencer = ReadSequencer(
output="./tests/fasta_testfile/results.fasta") output="./tests/fasta_testfile/results.fasta")
sequencer.define_random_sequences(n_seq=100) sequencer.define_random_sequences(n_seq=100)
...@@ -23,7 +26,9 @@ def test_run_random(): ...@@ -23,7 +26,9 @@ def test_run_random():
sequencer.run_sequencing() sequencer.run_sequencing()
os.remove("./tests/fasta_testfile/results.fasta") os.remove("./tests/fasta_testfile/results.fasta")
def test_run_random_chunks(): def test_run_random_chunks():
"""Test random run chunks."""
# setup class # setup class
sequencer = ReadSequencer( sequencer = ReadSequencer(
output="./tests/fasta_testfile/results.fasta", output="./tests/fasta_testfile/results.fasta",
...@@ -44,6 +49,7 @@ def test_run_random_chunks(): ...@@ -44,6 +49,7 @@ def test_run_random_chunks():
def test_run_sequencing(): def test_run_sequencing():
"""Test sequencing run."""
sequencer = ReadSequencer( sequencer = ReadSequencer(
fasta="./tests/fasta_testfile/50_seqs_50_1000_bp.fasta", fasta="./tests/fasta_testfile/50_seqs_50_1000_bp.fasta",
output="./tests/fasta_testfile/results.fasta", output="./tests/fasta_testfile/results.fasta",
...@@ -59,7 +65,9 @@ def test_run_sequencing(): ...@@ -59,7 +65,9 @@ def test_run_sequencing():
for file in result_file: for file in result_file:
os.remove(file) os.remove(file)
def test_run_sequencing_chunks(): def test_run_sequencing_chunks():
"""Test run sequencing chunks."""
# setup class # setup class
sequencer = ReadSequencer( sequencer = ReadSequencer(
fasta="./tests/fasta_testfile/50_seqs_50_1000_bp.fasta", fasta="./tests/fasta_testfile/50_seqs_50_1000_bp.fasta",
...@@ -78,6 +86,3 @@ def test_run_sequencing_chunks(): ...@@ -78,6 +86,3 @@ def test_run_sequencing_chunks():
assert len(result_files) == 5 assert len(result_files) == 5
for file in result_files: for file in result_files:
os.remove(file) os.remove(file)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment