Source code for promod3.core.pm3argparse

"""
Extensions for the argparse module.
"""

import argparse
import sys
import os
import gzip
import tempfile
#try:
#    import ujson as json
#except ImportError:
import json

import ost
from ost import io, seq

from promod3.core import helper

def _TmpForGZip(filename, suffix, msg_prefix):
    """Unpack a file to a tmp file if gzipped.
    """
    helper.FileExists(msg_prefix, 12, filename)
    try:
        zip_fh = gzip.open(filename)
        unzip_str = zip_fh.read()
        zip_fh.close()
    except IOError, ioe:
        helper.MsgErrorAndExit(msg_prefix + " gzip file '" + filename +
                               "' cannot be opened: " + str(ioe), 14)
    unzip_file = tempfile.NamedTemporaryFile(mode='w', suffix=suffix)
    unzip_file.write(unzip_str)
    unzip_file.flush()
    return unzip_file

def _CheckJSONAlnSeqKeyType(key_name, val_type, json_aln, seqtype, json_source):
    '''Check a key/value in a sequence exists and is of certain type.
    '''
    if key_name not in json_aln[seqtype].keys():
        helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
                               "from '%s' is " % json_source+
                               "missing the '%s' key" % key_name, 27)
    altype = type(json_aln[seqtype][key_name])

    if val_type is str or val_type is unicode:
        if not (altype is unicode or altype is str):
            helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
                                   "'%s' from" % key_name+
                                   "'%s' is not a " % json_source+
                                   "%s" % str(val_type), 28)
    elif not altype is val_type:
        helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
                               "'%s' from" % key_name+
                               "'%s' is not a " % json_source+
                               "%s" % str(val_type), 28)

def _GetAlnFromJSON(json_object, json_source):
    """Create alignments from a JSON object.

    Iterate the alignments in a JSON object and deliver OST alignments via the
    yield operator.
    """
    # alignments are stored via the 'alignmentlist' key
    if 'alignmentlist' not in json_object.keys():
        helper.MsgErrorAndExit("JSON object from '%s' does not " % json_source+
                               "provide an 'alignmentlist' key.", 21)
    # alignments come as lists, to enable hetero oligos
    if not type(json_object['alignmentlist']) is list:
        helper.MsgErrorAndExit("JSON object from '%s' does not" % json_source+
                               "provide a list behind 'alignmentlist'.", 24)
    # take the alignments apart, each alignment is a dictionary
    for json_aln in json_object['alignmentlist']:
        # json_aln needs to be a dictionary
        if not type(json_aln) is dict:
            helper.MsgErrorAndExit("JSON 'alignmentlist' member from "+
                                   "'%s' is not a ' " %  json_source+
                                   " dictionary: %s" % json_aln, 25)
        # an alignment has a 'target' and a 'template' dictionary
        # each of them has a 'name' and a 'seqres' pair
        for flav in ['target', 'template']:
            if flav not in json_aln.keys():
                helper.MsgErrorAndExit("JSON 'alignmentlist' from "+
                                       "'%s' does not " % json_source+
                                       "provide a '%s' key." % flav, 22)
            # check sequence to be dictionary
            if not type(json_aln[flav]) is dict:
                helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' from" % flav+
                                       "'%s' is not a " % json_source+
                                       "dictionary: %s" % json_aln[flav], 26)
            # check for keys needed by both sequences:
            for aln_key in ['name', 'seqres']:
                _CheckJSONAlnSeqKeyType(aln_key, str, json_aln, flav,
                                        json_source)
        _CheckJSONAlnSeqKeyType('offset', int, json_aln, 'template',
                                json_source)
        # create and yield alignment
        trg_name = str(json_aln['target']['name']).strip()
        trg_seq = str(json_aln['target']['seqres'])
        tpl_name = str(json_aln['template']['name']).strip()
        tpl_seq = str(json_aln['template']['seqres'])
        new_aln = seq.CreateAlignment(seq.CreateSequence(trg_name, trg_seq),
                                      seq.CreateSequence(tpl_name, tpl_seq))
        new_aln.SetSequenceRole(0, 'TARGET')
        new_aln.SetSequenceRole(1, 'TEMPLATE')
        new_aln.SetSequenceOffset(1, json_aln['template']['offset'])
        yield new_aln

def _GetJSONOBject(json_input):
    """Get a JSON object out of a string which may be an object or a path.

    If the input string starts with '{', we assume its a JSON object. File names
    starting with '{' would be a bit weird.

    If we are looking at a file, check and load it.

    For a JSON object, check that everything is there. No checks for
    superfluous stuff.

    As returnvalue we only use JSON objects.
    """
    if json_input[0] != '{':
        helper.FileExists("JSON Alignment", 12, json_input)
        is_gz = helper.FileGzip("JSON alignment", 13, json_input)
        readfile = json_input
        if is_gz:
            unzip_file = _TmpForGZip(json_input, '.json', "JSON alignment")
            readfile = unzip_file.name
        try:
            jfh = open(readfile)
        except IOError, ioe:
            helper.MsgErrorAndExit("'--json' file '%s' " % json_input+
                                   "can not be processed: %s" % ioe.strerror,
                                   19)
        except:
            raise
        try:
            json_object = json.load(jfh)
        except ValueError, vae:
            if vae.message == 'No JSON object could be decoded':
                helper.MsgErrorAndExit("'--json' file '%s' could " % json_input+
                                       "not be processed into a JSON object, "+
                                       "probably it's empty.", 20)
            else:
                raise
        except:
            raise
        jfh.close()
    else:
        try:
            json_object = json.loads(json_input)
        except ValueError, vae:
            helper.MsgErrorAndExit("'--json' string '%s' " % json_input+\
                                   "could not be decoded: %s" % vae.message, 23)
    return json_object

def _FetchAlnFromFile(seqfile, allow_multitemplate, format):
    """Read alignment from seqfile and return it."""
    argstr = "'--" + format + " " + seqfile + "'"
    helper.FileExists("Alignment", 12, seqfile)
    # checking if alignment file has 'gz' extension
    is_gz = helper.FileGzip("Alignment", 13, seqfile)
    # loading the alignment, switch for gzip
    readfile = seqfile
    if is_gz:
        unzip_file = _TmpForGZip(seqfile, '.fas', "Alignment")
        readfile = unzip_file.name
    try:
        aln = io.LoadAlignment(readfile, format=format)
    except Exception, exc: #pylint: disable=broad-except
        if exc.message in ['Bad FASTA file: File is empty',
                           'Bad CLUSTAL file: File is empty']:
            helper.MsgErrorAndExit(argstr +  " refers to an empty file or " +
                                   "its in the wrong format.", 15)
        else:
            helper.MsgErrorAndExit(argstr + ": error when reading alignment "+
                                   "file: " + str(exc), 18)
    finally:
        if is_gz:
            unzip_file.close()
    # checking the alignment
    if aln.GetCount() == 1:
        helper.MsgErrorAndExit(argstr + " points to an alignment with only " +
                               "1 sequence.", 16)
    if aln.GetCount() > 2 and not allow_multitemplate:
        helper.MsgErrorAndExit(argstr + " points to an alignment with more " +
                               "than 2 sequences and we do not allow this.", 16)
    # identify target
    target_idx = -1
    sequences = [(s.name.strip(),s.string) for s in aln.sequences]
    for i,s in enumerate(sequences):
        if s[0].lower() in ['trg', 'target']:
            if target_idx >= 0:
                helper.MsgErrorAndExit(argstr + ": multiple targets found!", 17)
            target_idx = i
    # reshuffle
    if target_idx > 0:
        sequences.insert(0, sequences[target_idx])
        del sequences[target_idx+1]
    # generate alignment
    new_aln = seq.CreateAlignment()
    for s in sequences:
        new_aln.AddSequence(seq.CreateSequence(s[0], s[1]))
    new_aln.SetSequenceRole(0, 'TARGET')
    for i in range(1, new_aln.GetCount()):
        new_aln.SetSequenceRole(i, 'TEMPLATE')

    return new_aln

def _LoadPDB(filename):
    """Load PDB file from filename and return it."""
    argstr = "'--pdb " + filename + "'"
    helper.FileExists("PDB Structure", 32, filename)
    try:
        ent = io.LoadPDB(filename)
    except Exception, exc: #pylint: disable=broad-except
        helper.MsgErrorAndExit(argstr + ": failure to parse PDB file: " +
                               str(exc), 33)
    return ent

def _LoadEntity(filename):
    """Load generic structure file from filename and return it."""
    argstr = "'--entity " + filename + "'"
    helper.FileExists("Structure", 32, filename)
    try:
        ent = io.LoadEntity(filename)
    except Exception, exc: #pylint: disable=broad-except
        if exc.message.startswith('no suitable entity io handler found'):
            helper.MsgErrorAndExit(argstr + ": not a supported format " +
                                   str(exc), 34)
        else:
            helper.MsgErrorAndExit(argstr + ": failure to parse PDB file: " +
                                   str(exc), 33)
    return ent

def _GetChains(structures, structure_sources):
    """Get chain id to entity view (single chain) mapping (dict)."""
    # IDs: (file_base = base file name with no extensions)
    # - file_base.chain_name
    # - file_base (iff only one chain in file)
    # - chain_name (iff only one file)
    # - note: single entry with key 'UNIQUE' created if only one chain in total!
    chain_entities = dict()
    single_file = len(structure_sources) == 1
    # parse structures
    for file_name, ent in zip(structure_sources, structures):
        # get pure file name with no extension
        file_base = os.path.basename(file_name)
        file_split = os.path.splitext(file_base)
        if file_split[1] == '.gz':
            file_base = os.path.splitext(file_split[0])[0]
        else:
            file_base = file_split[0]
        # get chainnames
        prot = ent.Select("peptide=True")
        single_chain = prot.chain_count == 1
        chain_names = [ch.name for ch in prot.chains]
        # fill dict
        if single_file and single_chain:
            chain_entities['UNIQUE'] = prot
        elif single_chain:
            chain_entities[file_base + '.' + chain_names[0]] = prot
            chain_entities[file_base] = prot
        else:
            for chain_name in chain_names:
                ch_ent = prot.Select("cname=" + chain_name)
                chain_entities[file_base + '.' + chain_name] = ch_ent
                if single_file:
                    chain_entities[chain_name] = ch_ent
    return chain_entities

def _AttachViewsToAln(aln, chain_entities):
    """Attach views to tpl. sequences in aln according to sequence names."""
    for i in range(1, aln.GetCount()):
        seq_name = aln.GetSequence(i).GetName()
        # extract offset
        my_split = seq_name.split('|')
        tpl_id = my_split[0].strip()
        if len(my_split) == 2 and my_split[1].strip().isdigit():
            # set offset
            tpl_offset = int(my_split[1].strip())
            # mismatch with existing one?
            old_offset = aln.GetSequenceOffset(i)
            if old_offset > 0 and old_offset != tpl_offset:
                helper.MsgErrorAndExit("Inconsistent offsets between seq. name"+
                                       " and seq. in alignment for " + seq_name,
                                       42)
            else:
                aln.SetSequenceOffset(i, tpl_offset)
        elif len(my_split) == 2 and not my_split[1].strip().isdigit():
            helper.MsgErrorAndExit("Non-integer offset defined in seq. name "+
                                   seq_name, 43)
        elif len(my_split) > 2:
            helper.MsgErrorAndExit("Too many '|' in seq. name " + seq_name, 44)
        # identify chain and attach view
        if len(chain_entities) == 1:
            aln.AttachView(i, chain_entities['UNIQUE'].CreateFullView())
        elif chain_entities.has_key(tpl_id):
            aln.AttachView(i, chain_entities[tpl_id].CreateFullView())
        else:
            helper.MsgErrorAndExit("Could not find chain with ID " + tpl_id +
                                   " (should be <FILE>.<CHAIN>) to attach to"+
                                   " sequence named " + seq_name, 45)

[docs]class PM3ArgumentParser(argparse.ArgumentParser): """ This class is a child of :class:`argparse.ArgumentParser`. It provides a set of standard arguments which can be activated with :meth:`Add*` methods and then assembled with :meth:`AssembleParser`. This helps keeping up a common naming scheme throughout all |project| actions. As a real extension, this subclass provides checking of input parameters on :meth:`Parse`. Besides this, everything you can do with a 'real' :class:`~argparse.ArgumentParser` instance is possible here. Attributes beyond :class:`argparse.ArgumentParser`: .. attribute:: action Indicates if the calling script is a |project| action. :type: :class:`bool` """
[docs] def __init__(self, description, action=True): """ Create a new instance of :class:`~pm3argparse.PM3ArgumentParser`. :param description: Help text for this script, handed down to |descattr|_ of |argpinit|_. :type description: :class:`str` :param action: Indicates if the calling script is a |project| action. This influences |progattr|_ of :class:`~argparse.ArgumentParser` by clipping of the first 3 characters of the file name of the script. If ``False``, default behaviour of :class:`~argparse.ArgumentParser` kicks in. :type action: :class:`bool` :returns: :class:`argparse.ArgumentParser`. """ prog = None if action: prog = os.path.basename(sys.argv[0])[3:] argparse.ArgumentParser.__init__(self, prog=prog, description=description, formatter_class=\ argparse.RawDescriptionHelpFormatter) self.action = action self.activate = set()
def _print_message(self, message, file=None): #pylint: disable=redefined-builtin """ This is like a welcome message to the "country of bad style"... we are overwriting a "_" function from the parent-class. Those guys should not be used outside of the housing module, never... but here it is a single function to bend :mod:`argparse` to use :class:`ost.Logger`. """ if message: no_nl_msg = message if message[-1] == '\n': no_nl_msg = message[:-1] if file is None or file is sys.stderr: ost.LogError(no_nl_msg) else: ost.LogScript(no_nl_msg)
[docs] def Parse(self, args=None): """ Parse an argument string. See :meth:`Add*` methods. Options/arguments added by default: ``-h/--help`` shows usage. General exit codes: * 1 - an unhandled exception was raised * 2 - arguments cannot be parsed or required arguments are missing :param args: The argument string. As default |sysargv|_ is used. :type args: :class:`list` :returns: Namespace filled with attributes (see :meth:`Add*` methods). """ opts = PM3OptionsNamespace() self.parse_args(args=args, namespace=opts) opts.PostProcess(self.activate) return opts
[docs] def AssembleParser(self): """ When adding options via the :meth:`Add*` methods, call this after you are done. Everything before just tells the parser that it should contain those option sets but does not actually add anything. :meth:`AssembleParser` will put everything in place, in the right order and with the right constraints. """ if 'ALIGNMENT' in self.activate: self._AssembleAlignment() if 'STRUCTURE' in self.activate: self._AssembleStructure()
[docs] def AddAlignment(self, allow_multitemplate=False): """Commandline options for alignments. Activate everything needed to load alignments to the argument parser. Command line arguments are then added in :meth:`AssembleParser` and the input is post processed and checked in :meth:`Parse`. :param allow_multitemplate: enable support for multitemplate alignments :type allow_multitemplate: :class:`bool` Options/arguments added: * ``-f/--fasta <FILE>`` - Target-template alignment in FASTA format. Target sequence is either named "trg" or "target" or the first sequence is used. File can be plain or gzipped. * ``-c/--clustal <FILE>`` - Target-template alignment in CLUSTAL format. Target sequence is either named "trg" or "target" or the first sequence is used. File can be plain or gzipped. * ``-j/--json <OBJECT>|<FILE>`` - Alignments provided as JSON file/object. File can be plain or gzipped. See :ref:`here <promod-build-model>` for details on the file formats. Attributes added to the namespace returned by :meth:`Parse`: * :attr:`fasta` - filled with the input of the ``--fasta`` option, a :class:`list` of :class:`str` (filenames). * :attr:`clustal` - filled with the input of the ``--clustal`` option, a :class:`list` of :class:`str` (filenames). * :attr:`json` - filled with the input of the ``--json`` option, a :class:`list` of :class:`str`, where each string may be a filename or a JSON object string. * :attr:`alignments` - :class:`ost.AlignmentList`, same order as given. First sequence of the alignment is the target sequence, if in doubt, check for sequence roles ``TARGET`` or ``TEMPLATE`` * :attr:`aln_sources` - :class:`list` of :class:`str` with the original source(s) of the alignment: may be filename(s) or JSON strings. Exit codes related to alignment input: * 12 - a given alignment file does not exist * 13 - never raised (parameter for checking gzip files) * 14 - gzip file cannot be opened * 15 - found an empty alignment file * 16 - unsupported number of sequences in alignment: only 1 sequence or (unless *allow_multitemplate* = True) more than 2 sequences * 17 - mutliple target sequences found in alignment * 18 - error when reading fasta/clustal file * 19 - problem with a JSON formatted file handed over to ``--json`` * 20 - JSON file could not be decoded into a JSON object * 21 - JSON object has no 'alignmentlist' key * 22 - JSON object has no 'target'/'template' in the 'alignmentlist' * 23 - JSON string could not be decoded * 24 - JSON object 'alignmentlist' does not point to a list * 25 - JSON object 'alignmentlist' member is not a dictionary * 26 - JSON object 'alignmentlist' 'target'/'template' does not point to a dictionary * 27 - JSON object 'alignmentlist' 'target'/'template' does not have a needed key * 28 - JSON object 'alignmentlist' 'target'/'template' has a value of wrong type """ self.activate.add('ALIGNMENT') if allow_multitemplate: self.activate.add('ALLOW_MULTITEMPLATE')
[docs] def AddStructure(self, attach_views=False): """Commandline options for structures. Activate everything needed to load alignments to the argument parser. Command line arguments are then added in :meth:`AssembleParser` and the input is post processed and checked in :meth:`Parse`. :param attach_views: if True: attach views to alignments. Requires call to :meth:`AddAlignment`. Chains for each sequence are identified based on the sequence name of the templates in the alignments (see :ref:`here <promod-build-model>` for details). :type attach_views: :class:`bool` Options/arguments added: * ``-p/--pdb <FILE>`` - Structure in PDB format. File can be plain or gzipped. * ``-e/--entity <FILE>`` - Structure in any format readable by the :meth:`ost.io.LoadEntity` method. Format is chosen by file ending. Recognized File Extensions: .ent, .pdb, .ent.gz, .pdb.gz, .cif, .cif.gz. Notes: * one of the inputs must be given and only one type of input acceptable * callable multiple times (structures appended in given order) Attributes added to the namespace returned by :meth:`Parse`: * :attr:`pdb` - filled with the input of the ``--pdb`` option, a :class:`list` of :class:`str` (filenames). * :attr:`entity` - filled with the input of the ``--entity`` option, a :class:`list` of :class:`str` (filenames). * :attr:`structures` - :class:`list` of :class:`ost.EntityHandle`, same order as given. * :attr:`structure_sources` - :class:`list` of :class:`str` with the original filenames of the structures. Exit codes related to alignment input: * 32 - a given structure file does not exist * 33 - failure to read a given structure file * 34 - file ending is not a supported format Exit codes if *attach_views* = True: * 41 - attach_views used without adding alignments * 42 - inconsistent offsets between seq. name and seq. in alignment * 43 - non-integer offset defined in seq. name * 44 - too many "|" in seq. name * 45 - chain to attach to sequence could not be identified """ self.activate.add('STRUCTURE') if attach_views: self.activate.add('ATTACH_VIEWS')
def _AssembleAlignment(self): """Actually add alignment arguments/options.""" aln_grp = self.add_mutually_exclusive_group(required=True) # fasta input aln_grp.add_argument('-f', '--fasta', metavar=('<FILE>'), help='Target-template alignment in FASTA format. '+ 'Target sequence is either named "trg" or '+ '"target" or the first sequence is used. '+ 'File can be plain or gzipped.', action='append', default=list()) # clustal input aln_grp.add_argument('-c', '--clustal', metavar=('<FILE>'), help='Target-template alignment in CLUSTAL format. '+ 'Target sequence is either named "trg" or '+ '"target" or the first sequence is used. '+ 'File can be plain or gzipped.', action='append', default=list()) # JSON input aln_grp.add_argument('-j', '--json', metavar='<OBJECT>|<FILE>', help='Alignments provided as JSON file/object. '+ 'File can be plain or gzipped.', action='append', default=list()) def _AssembleStructure(self): """Actually add structure arguments/options.""" aln_grp = self.add_mutually_exclusive_group(required=True) # pdb input aln_grp.add_argument('-p', '--pdb', metavar=('<FILE>'), help='Structure in PDB format. '+ 'File can be plain or gzipped.', action='append', default=list()) # any OST entity aln_grp.add_argument('-e', '--entity', metavar=('<FILE>'), help="Structure in any format readable by OST's "+ "io.LoadEntity method. Format is chosen by file "+ "ending. Recognized File Extensions: .ent, .pdb, "+ ".ent.gz, .pdb.gz, .cif, .cif.gz.", action='append', default=list())
class PM3OptionsNamespace(object): # class will grow, so for the moment pylint is ignored #pylint: disable=too-few-public-methods """Output of :meth:`PM3ArgumentParser.Parse`. Like output of :meth:`argparse.ArgumentParser.parse_args` with additional functions for convenience. """ def __init__(self): pass def PostProcess(self, activated): """Post processing of activated option packs.""" self.allow_multitemplate = 'ALLOW_MULTITEMPLATE' in activated if 'ALIGNMENT' in activated: self._PostProcessAlignment() if 'STRUCTURE' in activated: self._PostProcessStructure() if 'ATTACH_VIEWS' in activated: self._AttachViews() def _PostProcessAlignment(self): #pylint: disable=no-member #pylint: disable=attribute-defined-outside-init """Get alignments from command line input.""" self.aln_sources = list() self.alignments = seq.AlignmentList() # parse fasta files for src in self.fasta: new_aln = _FetchAlnFromFile(src, self.allow_multitemplate, "fasta") self.alignments.append(new_aln) self.aln_sources.append(src) # parse clustal files for src in self.clustal: new_aln = _FetchAlnFromFile(src, self.allow_multitemplate, "clustal") self.alignments.append(new_aln) self.aln_sources.append(src) # parse JSON input for src in self.json: json_obj = _GetJSONOBject(src) for aln in _GetAlnFromJSON(json_obj, src): self.alignments.append(aln) self.aln_sources.append(src) def _PostProcessStructure(self): #pylint: disable=attribute-defined-outside-init """Get structures from command line input.""" self.structures = list() self.structure_sources = list() # parse pdb files for src in self.pdb: self.structures.append(_LoadPDB(src)) self.structure_sources.append(src) # parse generic structures for src in self.entity: self.structures.append(_LoadEntity(src)) self.structure_sources.append(src) def _AttachViews(self): """Attach views to tpl. sequences according to sequence names.""" if not (hasattr(self, 'structures') and hasattr(self, 'alignments')): helper.MsgErrorAndExit("Need to have structures and alignments to "+ "attach views.", 41) # get chain id to entity view (single chain) mapping (dict) chain_entities = _GetChains(self.structures, self.structure_sources) # go through all templates in all alignments for aln in self.alignments: _AttachViewsToAln(aln, chain_entities) # LocalWords: param attr prog argparse ArgumentParser bool sys os init str # LocalWords: progattr descattr argpinit argv formatter meth args namespace # LocalWords: ArgumentDefaultsHelpFormatter sysargv AssembleParser fasta io # LocalWords: metavar trg tpl FastA gzip tempfile ost promod aln stderr src # LocalWords: AssembleTrgTplAln CreateSequence SetSequenceOffset LogError # LocalWords: LogScript OptionsNamespace PostProcess AssembleAlignment JSON # LocalWords: AddAlignment AlignmentList SEQNAME whitespaces nargs trgname # LocalWords: PostProcessAlignment startswith seqfile elif MsgErrorAndExit # LocalWords: len FileExists gz FileGzip readfile fh NamedTemporaryFile fas # LocalWords: LoadAlignment exc GetCount fst GetSequence snd