Source code for promod3.core.pm3argparse

"""
Extensions for the argparse module.
"""

import argparse
import sys
import os
import gzip
import tempfile

import ost
from ost import io, seq

from promod3.core import helper

def _AssembleTrgTplAln(target, template):
    """
    Internal function: Assemble a target-template alignment without leading/
    final gaps in the target sequence. Set the offset for the template sequence.
    """
    # count leading gaps to get the start position
    start = 0
    for i in range(0, target.length):
        if target[i] != '-':
            start = i
            break
    # get rid of closing gaps at the end
    end = target.length
    for i in range(target.length, 1, -1):
        if target[i-1] != '-':
            end = i
            break
    # assemble template sequence
    tpl_str = ''
    for i in range(start, end):
        tpl_str += template[i]
    new_aln = seq.CreateAlignment(seq.CreateSequence(target.name.strip(),
                                                     str(target)[start:end]),
                                  seq.CreateSequence(template.name.strip(),
                                                     tpl_str))
    new_aln.SetSequenceOffset(1, start)
    return new_aln

[docs]class PM3ArgumentParser(argparse.ArgumentParser): """ This class is a child of :class:`argparse.ArgumentParser`. It provides a set of standard arguments which can be activated, rather than added via the traditional way. This helps keeping up a common naming scheme throughout all |project| actions. As a real extension, this subclass provides checking of input parameters on :meth:`Parse`. Beside this, everything you can do with a 'real' :class:`~argparse.ArgumentParser` instance is possible here. A note on exit codes: if :meth:`~pm3argparse.PM3ArgumentParser.Parse` is called on unrecognised arguments, the script exits with a code 2 by :class:`argparse.ArgumentParser.parse_args()`. Attributes beyond :class:`argparse.ArgumentParser`: .. attribute:: action Indicates if the calling script is a |project| action. :type: :class:`bool` """
[docs] def __init__(self, description, action=True): """ Create a new instance of :class:`~pm3argparse.PM3ArgumentParser`. :param description: Help text for this script, handed down to |descattr|_ of |argpinit|_. :type description: :class:`str` :param action: Indicates if the calling script is a |project| action. This influences |progattr|_ of :class:`~argparse.ArgumentParser` by clipping of the first 3 characters of the file name of the script. If ``False``, default behaviour of :class:`~argparse.ArgumentParser` kicks in. :type action: :class:`bool` :returns: :class:`argparse.ArgumentParser`. """ prog = None if action: prog = os.path.basename(sys.argv[0])[3:] argparse.ArgumentParser.__init__(self, prog=prog, description=description, formatter_class=\ argparse.ArgumentDefaultsHelpFormatter) self.action = action self.activate = dict()
def _print_message(self, message, file=None): #pylint: disable=redefined-builtin """ This is like a welcome message to the "country of bad style"... we are overwriting a "_" function from the parent-class. Those guys should not be used outside of the housing module, never... but here it is a single function to bend :mod:`argparse` to use :class:`ost.Logger`. """ if message: no_nl_msg = message if message[-1] == '\n': no_nl_msg = message[:-1] if file is None or file is sys.stderr: ost.LogError(no_nl_msg) else: ost.LogScript(no_nl_msg)
[docs] def Parse(self, args=None): """ Parse an argument string. :param args: The argument string. As default |sysargv|_ is used. :type args: :class:`list` :returns: :class:`promod3.cor.pm3argparse.PM3OptionsNamespace`. """ opts = PM3OptionsNamespace() self.parse_args(args=args, namespace=opts) opts.PostProcess(self.activate.keys()) return opts
[docs] def AssembleParser(self): """ When adding options via the :meth:`Add*` methods, call this after you are done. Everything before just tells the parser that it should contain those option sets but does not actually add anything. :meth:`AssembleParser` will put everything in place, in the right order and with the right constraints. """ if 'ALIGNMENT' in self.activate.keys(): self._AssembleAlignment()
[docs] def AddAlignment(self): """ Add everything needed to load alignments to the argument parser. Creates several options/ arguments and adds some checks for post processing. This method only adds a flag to the parser to add alignment options on :meth:`AssembleParser`. Depending on which options you activate, things need to be added in a different order or have other constraints. Options/ arguments added: * ``--fasta trg:<NAME> <FILE>`` - describing a target-template alignment with ``trg:`` marking the target sequence inside :file:`<FILE>` Exit codes related to alignment input: * 11 - no prefix ``trg:`` found for an argument to '--fasta' * 12 - a given alignment file does not exist * 13 - never raised (parameter for checking gzip files) * 14 - empty target name found (``trg:``) * 15 - found an empty alignment file * 16 - alignment with more than 2 sequences found * 17 - target sequence name not found in alignment * 18 - sequences in the alignment have different length Attributes added to the namespace returned by :meth:`Parse`: * :attr:`fasta` - filled with the input of the '--fasta' argument, a :class:`list` with multiple :class:`list` objects * :attr:`alignments` - :class:`ost.AlignmentList`, same order as :attr:`fasta` * :attr:`aln_sources` - the original source of the alignment, may be filename(s) or a string in JSON format, :class:`list` of all sources """ self.activate['ALIGNMENT'] = 1
def _AssembleAlignment(self): """ Actually add alignment arguments/ options """ # FastA input: - always pairwise alignments # - callable multiple times # - goes by 'trg:<SEQNAME> <FILE>' # - excludes JSON file/ object # - leading whitespaces will be deleted self.add_argument('-f', '--fasta', nargs=2, action='append', metavar=('trg:<NAME>', '<FILE>'), help='Pairwise alignment in FastA format, needs to '+ 'declare what is the target sequence.') # input: FastA/ JSON # determined by extension: if we are wrong, the whole loading fails # possibility to add JSON: mention limitation!
class PM3OptionsNamespace(object): """ This one is mainly for internal use. You can use it like everything that comes out of :meth:`argparse.ArgumentParser.parse_args`. Attributes are added regarding how you assembled your argument parser. """ def __init__(self): pass def PostProcess(self, activated): """ Post processing of activated option packs. """ if 'ALIGNMENT' in activated: self._PostProcessAlignment() def _PostProcessAlignment(self): #pylint: disable=no-member #pylint: disable=attribute-defined-outside-init """ Doing some extra work after parsing. """ self.aln_sources = list() self.alignments = seq.AlignmentList() if self.fasta: for src in self.fasta: if src[0].startswith('trg:'): trgname = src[0][4:] seqfile = src[1] elif src[1].startswith('trg:'): trgname = src[1][4:] seqfile = src[0] else: helper.MsgErrorAndExit("'--fasta' requires one argument "+ "prefixed with 'trg:' marking the "+ "target sequence name", 11) if not len(trgname): helper.MsgErrorAndExit("'--fasta' requires argument "+ "'trg:' defining the "+ "target sequence name, empty one "+ "found: '%s'" % ' '.join(src), 14) helper.FileExists("Alignment", 12, seqfile) is_gz = helper.FileGzip("Alignment", 13, seqfile) readfile = seqfile if is_gz: zip_fh = gzip.open(seqfile) unzip_str = zip_fh.read() zip_fh.close() unzip_file = tempfile.NamedTemporaryFile(mode='w', suffix='.fas') unzip_file.write(unzip_str) unzip_file.flush() readfile = unzip_file.name try: aln = io.LoadAlignment(readfile, format="fasta") except Exception, exc: #pylint: disable=broad-except if exc.message == 'Bad FASTA file: File is empty': helper.MsgErrorAndExit("'--fasta' refers to an empty "+\ "file or its in the wrong "+ "format: %s" % seqfile, 15) elif exc.message == 'sequences have different lengths': helper.MsgErrorAndExit("'--fasta %s': " % ' '.join(src)+ "sequences in the alignment "+ "have different length.", 18) else: raise finally: if is_gz: unzip_file.close() # check alignment nos = aln.GetCount() if nos > 2: helper.MsgErrorAndExit("'--fasta %s' " % ' '.join(src)+ "points to an alignment with "+ "more than 2 sequences.", 16) fst_seq = aln.GetSequence(0) snd_seq = aln.GetSequence(1) if fst_seq.name.strip() == trgname: new_aln = _AssembleTrgTplAln(fst_seq, snd_seq) elif snd_seq.name.strip() == trgname: new_aln = _AssembleTrgTplAln(snd_seq, fst_seq) else: helper.MsgErrorAndExit("'--fasta %s' " % ' '.join(src)+ "does not define a target name "+ "found in the alignment.", 17) self.alignments.append(new_aln) self.aln_sources.append(seqfile) # LocalWords: param attr prog argparse ArgumentParser bool sys os init str # LocalWords: progattr descattr argpinit argv formatter meth args namespace # LocalWords: ArgumentDefaultsHelpFormatter sysargv AssembleParser fasta io # LocalWords: metavar trg tpl FastA gzip tempfile ost promod aln stderr src # LocalWords: AssembleTrgTplAln CreateSequence SetSequenceOffset LogError # LocalWords: LogScript OptionsNamespace PostProcess AssembleAlignment JSON # LocalWords: AddAlignment AlignmentList SEQNAME whitespaces nargs trgname # LocalWords: PostProcessAlignment startswith seqfile elif MsgErrorAndExit # LocalWords: len FileExists gz FileGzip readfile fh NamedTemporaryFile fas # LocalWords: LoadAlignment exc GetCount fst GetSequence snd