Source code for promod3.core.pm3argparse

"""
Extensions for the argparse module.
"""

import argparse
import sys
import os
import gzip
import tempfile
#try:
#    import ujson as json
#except ImportError:
import json

import ost
from ost import io, seq

from promod3.core import helper

def _TmpForGZip(filename, suffix, msg_prefix):
    """Unpack a file to a tmp file if gzipped.
    """
    helper.FileExists(msg_prefix, 12, filename)
    zip_fh = gzip.open(filename)
    unzip_str = zip_fh.read()
    zip_fh.close()
    unzip_file = tempfile.NamedTemporaryFile(mode='w', suffix=suffix)
    unzip_file.write(unzip_str)
    unzip_file.flush()
    return unzip_file

def _CheckJSONAlnSeqKeyType(key_name, val_type, json_aln, seqtype, json_source):
    '''Check a key/ value in a sequence exists and is of certain type.
    '''
    if key_name not in json_aln[seqtype].keys():
        helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
                               "from '%s' is " % json_source+
                               "missing the '%s' key" % key_name, 27)
    altype = type(json_aln[seqtype][key_name])

    if val_type is str or val_type is unicode:
        if not (altype is unicode or altype is str):
            helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
                                   "'%s' from" % key_name+
                                   "'%s' is not a " % json_source+
                                   "%s" % str(val_type), 28)
    elif not altype is val_type:
        helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
                               "'%s' from" % key_name+
                               "'%s' is not a " % json_source+
                               "%s" % str(val_type), 28)

def _CreateNewAln(trg_name, trg_seq, trg_start, trg_end, tpl_name, tpl_seq,
                  tpl_offset):
    # iternal function to makes things easier in other places, pylint ignored
    #pylint: disable=too-many-arguments
    '''Produce a new target-template alignment
    '''
    new_aln = seq.CreateAlignment(seq.CreateSequence(\
                                                    trg_name,
                                                    trg_seq[trg_start:trg_end]),
                                  seq.CreateSequence(tpl_name, tpl_seq))
    new_aln.SetSequenceRole(0, 'TARGET')
    new_aln.SetSequenceRole(1, 'TEMPLATE')
    new_aln.SetSequenceOffset(1, tpl_offset)
    return new_aln

def _GetAlnFromJSON(json_object, json_source):
    """Create alignments from a JSON object.

    Iterate the alignments in a JSON object and deliver OST alignments via the
    yield operator.
    """
    # alignments are stored via the 'alignmentlist' key
    if 'alignmentlist' not in json_object.keys():
        helper.MsgErrorAndExit("JSON object from '%s' does not " % json_source+
                               "provide an 'alignmentlist' key.", 21)
    # alignments come as lists, to enable hetero oligos
    if not type(json_object['alignmentlist']) is list:
        helper.MsgErrorAndExit("JSON object from '%s' does not" % json_source+
                               "provide a list behind 'alignmentlist'.", 24)
    # take the alignments apart, each alignment is a dictionary
    for json_aln in json_object['alignmentlist']:
        # json_aln needs to be a dictionary
        if not type(json_aln) is dict:
            helper.MsgErrorAndExit("JSON 'alignmentlist' member from "+
                                   "'%s' is not a ' " %  json_source+
                                   " dictionary: %s" % json_aln, 25)
        # an alignment has a 'target' and a 'template' dictionary
        # each of them has a 'name' and a 'seqres' pair
        for flav in ['target', 'template']:
            if flav not in json_aln.keys():
                helper.MsgErrorAndExit("JSON 'alignmentlist' from "+
                                       "'%s' does not " % json_source+
                                       "provide a '%s' key." % flav, 22)
            # check sequence to be dictionary
            if not type(json_aln[flav]) is dict:
                helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' from" % flav+
                                       "'%s' is not a " % json_source+
                                       "dictionary: %s" % json_aln[flav], 26)
            # check for keys needed by both sequences:
            for aln_key in ['name', 'seqres']:
                _CheckJSONAlnSeqKeyType(aln_key, str, json_aln, flav,
                                        json_source)
        _CheckJSONAlnSeqKeyType('offset', int, json_aln, 'template',
                                json_source)

        yield _CreateNewAln(str(json_aln['target']['name']).strip(),
                            str(json_aln['target']['seqres']),
                            0,
                            len(json_aln['target']['seqres']),
                            str(json_aln['template']['name']).strip(),
                            str(json_aln['template']['seqres']),
                            json_aln['template']['offset'])

def _GetJSONOBject(json_input):
    """Get a JSON object out of a string which may be an object or a path.

    If the input string starts with '{', we assume its a JSON object. File names
    starting with '{' would be a bit weird.

    If we are looking at a file, check and load it.

    For a JSON object, check that everything is there. No checks for
    superfluous stuff.

    As returnvalue we only use JSON objects.
    """
    if json_input[0] != '{':
        is_gz = helper.FileGzip("JSON alignment", 13, json_input)
        readfile = json_input
        if is_gz:
            unzip_file = _TmpForGZip(json_input, '.json', "JSON alignment")
            readfile = unzip_file.name
        try:
            jfh = open(readfile)
        except IOError, ioe:
            helper.MsgErrorAndExit("'--json' file '%s' " % json_input+
                                   "can not be processed: %s" % ioe.strerror,
                                   19)
        except:
            raise
        try:
            json_object = json.load(jfh)
        except ValueError, vae:
            if vae.message == 'No JSON object could be decoded':
                helper.MsgErrorAndExit("'--json' file '%s' could " % json_input+
                                       "not be processed into a JSON object, "+
                                       "probably it's empty.", 20)
            else:
                raise
        except:
            raise
        jfh.close()
    else:
        try:
            json_object = json.loads(json_input)
        except ValueError, vae:
            helper.MsgErrorAndExit("'--json' string '%s' " % json_input+\
                                   "could not be decoded: %s" % vae.message, 23)
    return json_object

def _GetTrgNameSeqFile(argstr):
    """Sort out what is target name and what is the sequence file name.

    With only two items in the argument string to '--fasta' we allow arbitrary
    orders. So we have to check for the 'trg:' prefix. With this, sequence files
    names may not start with 'trg:'.
    """
    if argstr[0].startswith('trg:'):
        trgname = argstr[0][4:]
        seqfile = argstr[1]
    elif argstr[1].startswith('trg:'):
        trgname = argstr[1][4:]
        seqfile = argstr[0]
    else:
        helper.MsgErrorAndExit("'--fasta %s' requires " % ' '.join(argstr)+
                               "one argument prefixed with 'trg:' marking "+
                               "the target sequence name", 11)
    # check that 'trg:' has a string attached
    if not len(trgname):
        helper.MsgErrorAndExit("'--fasta %s' requires " % ' '.join(argstr)+
                               "argument 'trg:' defining the "+
                               "target sequence name, empty one "+
                               "found: '%s'" % ' '.join(argstr), 14)
    helper.FileExists("Alignment", 12, seqfile)

    return trgname, seqfile

def _FetchAlnFromFastaOpt(argstr):
    """
    Dissasemble an argument to '--fasta' into an alignment and return.
    """
    trgname, seqfile = _GetTrgNameSeqFile(argstr)
    # checking if alignment file has 'gz' extension
    is_gz = helper.FileGzip("Alignment", 13, seqfile)
    # loading the alignment, switch for gzip
    readfile = seqfile
    if is_gz:
        unzip_file = _TmpForGZip(seqfile, '.fas', "Alignment")
        readfile = unzip_file.name
    try:
        aln = io.LoadAlignment(readfile, format="fasta")
    except Exception, exc: #pylint: disable=broad-except
        if exc.message == 'Bad FASTA file: File is empty':
            helper.MsgErrorAndExit("'--fasta %s' " % ' '.join(argstr)+
                                   "refers to an empty file or its in the "+
                                   "wrong format.", 15)
        elif exc.message == 'sequences have different lengths':
            helper.MsgErrorAndExit("'--fasta %s': " % ' '.join(argstr)+
                                   "sequences in the alignment "+
                                   "have different length.", 18)
        else:
            raise
    finally:
        if is_gz:
            unzip_file.close()
    # checking the alignment: only 2 sequences allowed, target name must be
    # the name of one of the sequences
    nos = aln.GetCount()
    if nos > 2:
        helper.MsgErrorAndExit("'--fasta %s' points to " % ' '.join(argstr)+
                               "an alignment with more than 2 sequences.",
                               16)
    fst_seq = aln.GetSequence(0)
    snd_seq = aln.GetSequence(1)
    if fst_seq.name.strip() == trgname:
        new_aln = _AssembleTrgTplAln(fst_seq, snd_seq)
    elif snd_seq.name.strip() == trgname:
        new_aln = _AssembleTrgTplAln(snd_seq, fst_seq)
    else:
        helper.MsgErrorAndExit("'--fasta %s' does not " % ' '.join(argstr)+
                               "define a target name found in the "+
                               "alignment.", 17)
    return seqfile, new_aln

def _AssembleTrgTplAln(target, template):
    """
    Internal function: Assemble a target-template alignment without leading/
    final gaps in the target sequence. Set the offset for the template sequence.
    """
    # count leading gaps to get the start position
    start = 0
    for i in range(0, target.length):
        if target[i] != '-':
            start = i
            break
    # get rid of closing gaps at the end
    end = target.length
    for i in range(target.length, 1, -1):
        if target[i-1] != '-':
            end = i
            break
    # assemble template sequence
    tpl_str = ''
    for i in range(start, end):
        tpl_str += template[i]
    return _CreateNewAln(target.name.strip(), str(target), start, end,
                         template.name.strip(), tpl_str, start)


class PM3StoreOnceAction(argparse.Action):
    """Action for argument parsing to prevent multiple calls to an option.
    """
    #pylint: disable=too-few-public-methods
    def __init__(self, *args, **kwargs):
        super(PM3StoreOnceAction, self).__init__(*args, **kwargs)
    def __call__(self, parser, namespace, values, option_string=None):
        if getattr(namespace, self.dest, None) is not None:
            raise argparse.ArgumentError(self, 'may only be used once.')
        setattr(namespace, self.dest, values)

[docs]class PM3ArgumentParser(argparse.ArgumentParser): """ This class is a child of :class:`argparse.ArgumentParser`. It provides a set of standard arguments which can be activated, rather than added via the traditional way. This helps keeping up a common naming scheme throughout all |project| actions. As a real extension, this subclass provides checking of input parameters on :meth:`Parse`. Beside this, everything you can do with a 'real' :class:`~argparse.ArgumentParser` instance is possible here. A note on exit codes: if :meth:`~pm3argparse.PM3ArgumentParser.Parse` is called on unrecognised arguments, the script exits with a code 2 by :class:`argparse.ArgumentParser.parse_args()`. Attributes beyond :class:`argparse.ArgumentParser`: .. attribute:: action Indicates if the calling script is a |project| action. :type: :class:`bool` """
[docs] def __init__(self, description, action=True): """ Create a new instance of :class:`~pm3argparse.PM3ArgumentParser`. :param description: Help text for this script, handed down to |descattr|_ of |argpinit|_. :type description: :class:`str` :param action: Indicates if the calling script is a |project| action. This influences |progattr|_ of :class:`~argparse.ArgumentParser` by clipping of the first 3 characters of the file name of the script. If ``False``, default behaviour of :class:`~argparse.ArgumentParser` kicks in. :type action: :class:`bool` :returns: :class:`argparse.ArgumentParser`. """ prog = None if action: prog = os.path.basename(sys.argv[0])[3:] argparse.ArgumentParser.__init__(self, prog=prog, description=description, formatter_class=\ argparse.ArgumentDefaultsHelpFormatter) self.action = action self.activate = dict()
def _print_message(self, message, file=None): #pylint: disable=redefined-builtin """ This is like a welcome message to the "country of bad style"... we are overwriting a "_" function from the parent-class. Those guys should not be used outside of the housing module, never... but here it is a single function to bend :mod:`argparse` to use :class:`ost.Logger`. """ if message: no_nl_msg = message if message[-1] == '\n': no_nl_msg = message[:-1] if file is None or file is sys.stderr: ost.LogError(no_nl_msg) else: ost.LogScript(no_nl_msg)
[docs] def Parse(self, args=None): """ Parse an argument string. :param args: The argument string. As default |sysargv|_ is used. :type args: :class:`list` :returns: :class:`promod3.core.pm3argparse.PM3OptionsNamespace`. """ opts = PM3OptionsNamespace() self.parse_args(args=args, namespace=opts) opts.PostProcess(self.activate.keys()) return opts
[docs] def AssembleParser(self): """ When adding options via the :meth:`Add*` methods, call this after you are done. Everything before just tells the parser that it should contain those option sets but does not actually add anything. :meth:`AssembleParser` will put everything in place, in the right order and with the right constraints. """ if 'ALIGNMENT' in self.activate.keys(): self._AssembleAlignment()
[docs] def AddAlignment(self): """Commandline options for alignments. Add everything needed to load alignments to the argument parser. Creates several options/ arguments and adds some checks for post processing. This method only adds a flag to the parser to add alignment options on :meth:`AssembleParser`. Depending on which options you activate, things need to be added in a different order or have other constraints. Options/ arguments added: * ``-f/ --fasta trg:<NAME> <FILE>`` - describing a target-template alignment with ``trg:`` marking the target sequence inside :file:`<FILE>`. The order of arguments is arbitrary which means file names starting with :file:`trg:` will not work. * ``-j/ --json <OBJECT>|<FILE>`` - target-template alignments in JSON format. Either an object string or a file name. The string variant is limited to how many characters your command line can gobble. Exit codes related to alignment input: * 11 - no prefix ``trg:`` found for an argument to ``--fasta`` * 12 - a given alignment file does not exist * 13 - never raised (parameter for checking gzip files) * 14 - empty target name found (``trg:``) * 15 - found an empty alignment file * 16 - alignment with more than 2 sequences found * 17 - target sequence name not found in alignment * 18 - sequences in the alignment have different length * 19 - problem with a JSON formatted file handed over to ``--json`` * 20 - JSON file could not be decoded into a JSON object * 21 - JSON object has no 'alignmentlist' key * 22 - JSON object has no 'target'/ 'template' in the 'alignmentlist' * 23 - JSON string could not be decoded * 24 - JSON object 'alignmentlist' does not point to a list * 25 - JSON object 'alignmentlist' member is not a dictionary * 26 - JSON object 'alignmentlist' 'target'/ 'template' does not point to a dictionary * 27 - JSON object 'alignmentlist' 'target'/ 'template' does not have a needed key * 28 - JSON object 'alignmentlist' 'target'/ 'template' has a value of wrong type Attributes added to the namespace returned by :meth:`Parse`: * :attr:`fasta` - filled with the input of the ``--fasta`` option, a :class:`list` with multiple :class:`list` objects * :attr:`json` - argument of the ``--json`` option, :class:`str`. May be a filename of a JSON object string. * :attr:`alignments` - :class:`ost.AlignmentList`, same order as :attr:`fasta`, likely to **not** follow the order of JSON input; first sequence of the alignment is the target sequence, if in doubt, check for sequence roles ``TARGET`` or ``TEMPLATE`` * :attr:`aln_sources` - the original source of the alignment, may be filename(s) or a string in JSON format, :class:`list` of all sources """ self.activate['ALIGNMENT'] = 1
def _AssembleAlignment(self): """ Actually add alignment arguments/ options """ aln_grp = self.add_mutually_exclusive_group(required=True) # FastA input: - always pairwise alignments # - callable multiple times # - goes by 'trg:<SEQNAME> <FILE>' # - excludes JSON file/ object # - leading whitespaces of FastA headers will be deleted aln_grp.add_argument('-f', '--fasta', nargs=2, action='append', metavar=('trg:<NAME>', '<FILE>'), help='Pairwise alignment in FastA format, needs '+ 'to declare what is the target sequence.') # JSON input: - right as string or file # - object starts with {, so not allowed for files # - callable only ONCE: should include everything needed # - fields/ objects used: ... # - goes by '--json <OBJECT>|<FILE>' # - excludes '--fasta' # - leading whitespaces of sequence names will be deleted aln_grp.add_argument('-j', '--json', metavar='<OBJECT>|<FILE>', help='Alignments provided as JSON file/ object.', action=PM3StoreOnceAction)
class PM3OptionsNamespace(object): # class will grow, so for the moment pylint is ignored #pylint: disable=too-few-public-methods """ This one is mainly for internal use. You can use it like everything that comes out of :meth:`argparse.ArgumentParser.parse_args`. Attributes are added regarding how you assembled your argument parser. """ def __init__(self): pass def PostProcess(self, activated): """ Post processing of activated option packs. """ if 'ALIGNMENT' in activated: self._PostProcessAlignment() def _PostProcessAlignment(self): #pylint: disable=no-member #pylint: disable=attribute-defined-outside-init """ Doing some extra work after parsing. """ self.aln_sources = list() self.alignments = seq.AlignmentList() if self.fasta: for src in self.fasta: seqfile, new_aln = _FetchAlnFromFastaOpt(src) self.alignments.append(new_aln) self.aln_sources.append(seqfile) return # Now for JSON input. Since one of the options needs to be given and # we already checked for FastA, no need to open a new branch, here. # decide if file or object json_obj = _GetJSONOBject(self.json) for aln in _GetAlnFromJSON(json_obj, self.json): self.alignments.append(aln) self.aln_sources.append(self.json) # LocalWords: param attr prog argparse ArgumentParser bool sys os init str # LocalWords: progattr descattr argpinit argv formatter meth args namespace # LocalWords: ArgumentDefaultsHelpFormatter sysargv AssembleParser fasta io # LocalWords: metavar trg tpl FastA gzip tempfile ost promod aln stderr src # LocalWords: AssembleTrgTplAln CreateSequence SetSequenceOffset LogError # LocalWords: LogScript OptionsNamespace PostProcess AssembleAlignment JSON # LocalWords: AddAlignment AlignmentList SEQNAME whitespaces nargs trgname # LocalWords: PostProcessAlignment startswith seqfile elif MsgErrorAndExit # LocalWords: len FileExists gz FileGzip readfile fh NamedTemporaryFile fas # LocalWords: LoadAlignment exc GetCount fst GetSequence snd