"""
Extensions for the argparse module.
"""
import argparse
import sys
import os
import gzip
import tempfile
#try:
# import ujson as json
#except ImportError:
import json
import ost
from ost import io, seq
from promod3.core import helper
def _TmpForGZip(filename, suffix, msg_prefix):
"""Unpack a file to a tmp file if gzipped.
"""
helper.FileExists(msg_prefix, 12, filename)
zip_fh = gzip.open(filename)
unzip_str = zip_fh.read()
zip_fh.close()
unzip_file = tempfile.NamedTemporaryFile(mode='w', suffix=suffix)
unzip_file.write(unzip_str)
unzip_file.flush()
return unzip_file
def _CheckJSONAlnSeqKeyType(key_name, val_type, json_aln, seqtype, json_source):
'''Check a key/ value in a sequence exists and is of certain type.
'''
if key_name not in json_aln[seqtype].keys():
helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
"from '%s' is " % json_source+
"missing the '%s' key" % key_name, 27)
altype = type(json_aln[seqtype][key_name])
if val_type is str or val_type is unicode:
if not (altype is unicode or altype is str):
helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
"'%s' from" % key_name+
"'%s' is not a " % json_source+
"%s" % str(val_type), 28)
elif not altype is val_type:
helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' " % seqtype+
"'%s' from" % key_name+
"'%s' is not a " % json_source+
"%s" % str(val_type), 28)
def _CreateNewAln(trg_name, trg_seq, trg_start, trg_end, tpl_name, tpl_seq,
tpl_offset):
# iternal function to makes things easier in other places, pylint ignored
#pylint: disable=too-many-arguments
'''Produce a new target-template alignment
'''
new_aln = seq.CreateAlignment(seq.CreateSequence(\
trg_name,
trg_seq[trg_start:trg_end]),
seq.CreateSequence(tpl_name, tpl_seq))
new_aln.SetSequenceRole(0, 'TARGET')
new_aln.SetSequenceRole(1, 'TEMPLATE')
new_aln.SetSequenceOffset(1, tpl_offset)
return new_aln
def _GetAlnFromJSON(json_object, json_source):
"""Create alignments from a JSON object.
Iterate the alignments in a JSON object and deliver OST alignments via the
yield operator.
"""
# alignments are stored via the 'alignmentlist' key
if 'alignmentlist' not in json_object.keys():
helper.MsgErrorAndExit("JSON object from '%s' does not " % json_source+
"provide an 'alignmentlist' key.", 21)
# alignments come as lists, to enable hetero oligos
if not type(json_object['alignmentlist']) is list:
helper.MsgErrorAndExit("JSON object from '%s' does not" % json_source+
"provide a list behind 'alignmentlist'.", 24)
# take the alignments apart, each alignment is a dictionary
for json_aln in json_object['alignmentlist']:
# json_aln needs to be a dictionary
if not type(json_aln) is dict:
helper.MsgErrorAndExit("JSON 'alignmentlist' member from "+
"'%s' is not a ' " % json_source+
" dictionary: %s" % json_aln, 25)
# an alignment has a 'target' and a 'template' dictionary
# each of them has a 'name' and a 'seqres' pair
for flav in ['target', 'template']:
if flav not in json_aln.keys():
helper.MsgErrorAndExit("JSON 'alignmentlist' from "+
"'%s' does not " % json_source+
"provide a '%s' key." % flav, 22)
# check sequence to be dictionary
if not type(json_aln[flav]) is dict:
helper.MsgErrorAndExit("JSON 'alignmentlist' '%s' from" % flav+
"'%s' is not a " % json_source+
"dictionary: %s" % json_aln[flav], 26)
# check for keys needed by both sequences:
for aln_key in ['name', 'seqres']:
_CheckJSONAlnSeqKeyType(aln_key, str, json_aln, flav,
json_source)
_CheckJSONAlnSeqKeyType('offset', int, json_aln, 'template',
json_source)
yield _CreateNewAln(str(json_aln['target']['name']).strip(),
str(json_aln['target']['seqres']),
0,
len(json_aln['target']['seqres']),
str(json_aln['template']['name']).strip(),
str(json_aln['template']['seqres']),
json_aln['template']['offset'])
def _GetJSONOBject(json_input):
"""Get a JSON object out of a string which may be an object or a path.
If the input string starts with '{', we assume its a JSON object. File names
starting with '{' would be a bit weird.
If we are looking at a file, check and load it.
For a JSON object, check that everything is there. No checks for
superfluous stuff.
As returnvalue we only use JSON objects.
"""
if json_input[0] != '{':
is_gz = helper.FileGzip("JSON alignment", 13, json_input)
readfile = json_input
if is_gz:
unzip_file = _TmpForGZip(json_input, '.json', "JSON alignment")
readfile = unzip_file.name
try:
jfh = open(readfile)
except IOError, ioe:
helper.MsgErrorAndExit("'--json' file '%s' " % json_input+
"can not be processed: %s" % ioe.strerror,
19)
except:
raise
try:
json_object = json.load(jfh)
except ValueError, vae:
if vae.message == 'No JSON object could be decoded':
helper.MsgErrorAndExit("'--json' file '%s' could " % json_input+
"not be processed into a JSON object, "+
"probably it's empty.", 20)
else:
raise
except:
raise
jfh.close()
else:
try:
json_object = json.loads(json_input)
except ValueError, vae:
helper.MsgErrorAndExit("'--json' string '%s' " % json_input+\
"could not be decoded: %s" % vae.message, 23)
return json_object
def _GetTrgNameSeqFile(argstr):
"""Sort out what is target name and what is the sequence file name.
With only two items in the argument string to '--fasta' we allow arbitrary
orders. So we have to check for the 'trg:' prefix. With this, sequence files
names may not start with 'trg:'.
"""
if argstr[0].startswith('trg:'):
trgname = argstr[0][4:]
seqfile = argstr[1]
elif argstr[1].startswith('trg:'):
trgname = argstr[1][4:]
seqfile = argstr[0]
else:
helper.MsgErrorAndExit("'--fasta %s' requires " % ' '.join(argstr)+
"one argument prefixed with 'trg:' marking "+
"the target sequence name", 11)
# check that 'trg:' has a string attached
if not len(trgname):
helper.MsgErrorAndExit("'--fasta %s' requires " % ' '.join(argstr)+
"argument 'trg:' defining the "+
"target sequence name, empty one "+
"found: '%s'" % ' '.join(argstr), 14)
helper.FileExists("Alignment", 12, seqfile)
return trgname, seqfile
def _FetchAlnFromFastaOpt(argstr):
"""
Dissasemble an argument to '--fasta' into an alignment and return.
"""
trgname, seqfile = _GetTrgNameSeqFile(argstr)
# checking if alignment file has 'gz' extension
is_gz = helper.FileGzip("Alignment", 13, seqfile)
# loading the alignment, switch for gzip
readfile = seqfile
if is_gz:
unzip_file = _TmpForGZip(seqfile, '.fas', "Alignment")
readfile = unzip_file.name
try:
aln = io.LoadAlignment(readfile, format="fasta")
except Exception, exc: #pylint: disable=broad-except
if exc.message == 'Bad FASTA file: File is empty':
helper.MsgErrorAndExit("'--fasta %s' " % ' '.join(argstr)+
"refers to an empty file or its in the "+
"wrong format.", 15)
elif exc.message == 'sequences have different lengths':
helper.MsgErrorAndExit("'--fasta %s': " % ' '.join(argstr)+
"sequences in the alignment "+
"have different length.", 18)
else:
raise
finally:
if is_gz:
unzip_file.close()
# checking the alignment: only 2 sequences allowed, target name must be
# the name of one of the sequences
nos = aln.GetCount()
if nos > 2:
helper.MsgErrorAndExit("'--fasta %s' points to " % ' '.join(argstr)+
"an alignment with more than 2 sequences.",
16)
fst_seq = aln.GetSequence(0)
snd_seq = aln.GetSequence(1)
if fst_seq.name.strip() == trgname:
new_aln = _AssembleTrgTplAln(fst_seq, snd_seq)
elif snd_seq.name.strip() == trgname:
new_aln = _AssembleTrgTplAln(snd_seq, fst_seq)
else:
helper.MsgErrorAndExit("'--fasta %s' does not " % ' '.join(argstr)+
"define a target name found in the "+
"alignment.", 17)
return seqfile, new_aln
def _AssembleTrgTplAln(target, template):
"""
Internal function: Assemble a target-template alignment without leading/
final gaps in the target sequence. Set the offset for the template sequence.
"""
# count leading gaps to get the start position
start = 0
for i in range(0, target.length):
if target[i] != '-':
start = i
break
# get rid of closing gaps at the end
end = target.length
for i in range(target.length, 1, -1):
if target[i-1] != '-':
end = i
break
# assemble template sequence
tpl_str = ''
for i in range(start, end):
tpl_str += template[i]
return _CreateNewAln(target.name.strip(), str(target), start, end,
template.name.strip(), tpl_str, start)
class PM3StoreOnceAction(argparse.Action):
"""Action for argument parsing to prevent multiple calls to an option.
"""
#pylint: disable=too-few-public-methods
def __init__(self, *args, **kwargs):
super(PM3StoreOnceAction, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is not None:
raise argparse.ArgumentError(self, 'may only be used once.')
setattr(namespace, self.dest, values)
[docs]class PM3ArgumentParser(argparse.ArgumentParser):
"""
This class is a child of :class:`argparse.ArgumentParser`. It provides a
set of standard arguments which can be activated, rather than added via the
traditional way. This helps keeping up a common naming scheme throughout
all |project| actions. As a real extension, this subclass provides checking
of input parameters on :meth:`Parse`. Beside
this, everything you can do with a 'real' :class:`~argparse.ArgumentParser`
instance is possible here.
A note on exit codes: if :meth:`~pm3argparse.PM3ArgumentParser.Parse` is
called on unrecognised arguments, the script exits with a code 2 by
:class:`argparse.ArgumentParser.parse_args()`.
Attributes beyond :class:`argparse.ArgumentParser`:
.. attribute:: action
Indicates if the calling script is a |project| action.
:type: :class:`bool`
"""
[docs] def __init__(self, description, action=True):
"""
Create a new instance of :class:`~pm3argparse.PM3ArgumentParser`.
:param description: Help text for this script, handed down to
|descattr|_ of |argpinit|_.
:type description: :class:`str`
:param action: Indicates if the calling script is a |project| action.
This influences |progattr|_ of
:class:`~argparse.ArgumentParser` by clipping of the
first 3 characters of the file name of the script. If
``False``, default behaviour of
:class:`~argparse.ArgumentParser` kicks in.
:type action: :class:`bool`
:returns: :class:`argparse.ArgumentParser`.
"""
prog = None
if action:
prog = os.path.basename(sys.argv[0])[3:]
argparse.ArgumentParser.__init__(self, prog=prog,
description=description,
formatter_class=\
argparse.ArgumentDefaultsHelpFormatter)
self.action = action
self.activate = dict()
def _print_message(self, message, file=None):
#pylint: disable=redefined-builtin
"""
This is like a welcome message to the "country of bad style"... we are
overwriting a "_" function from the parent-class. Those guys should not
be used outside of the housing module, never... but here it is a single
function to bend :mod:`argparse` to use :class:`ost.Logger`.
"""
if message:
no_nl_msg = message
if message[-1] == '\n':
no_nl_msg = message[:-1]
if file is None or file is sys.stderr:
ost.LogError(no_nl_msg)
else:
ost.LogScript(no_nl_msg)
[docs] def Parse(self, args=None):
"""
Parse an argument string.
:param args: The argument string. As default |sysargv|_ is used.
:type args: :class:`list`
:returns: :class:`promod3.core.pm3argparse.PM3OptionsNamespace`.
"""
opts = PM3OptionsNamespace()
self.parse_args(args=args, namespace=opts)
opts.PostProcess(self.activate.keys())
return opts
[docs] def AssembleParser(self):
"""
When adding options via the :meth:`Add*` methods, call this after you
are done. Everything before just tells the parser that it should
contain those option sets but does not actually add anything.
:meth:`AssembleParser` will put everything in place, in the right order
and with the right constraints.
"""
if 'ALIGNMENT' in self.activate.keys():
self._AssembleAlignment()
[docs] def AddAlignment(self):
"""Commandline options for alignments.
Add everything needed to load alignments to the argument parser. Creates
several options/ arguments and adds some checks for post processing.
This method only adds a flag to the parser to add alignment options on
:meth:`AssembleParser`. Depending on which options you activate, things
need to be added in a different order or have other constraints.
Options/ arguments added:
* ``-f/ --fasta trg:<NAME> <FILE>`` - describing a target-template
alignment with ``trg:`` marking the target sequence inside
:file:`<FILE>`. The order of arguments is arbitrary which means file
names starting with :file:`trg:` will not work.
* ``-j/ --json <OBJECT>|<FILE>`` - target-template alignments in JSON
format. Either an object string or a file name. The string variant is
limited to how many characters your command line can gobble.
Exit codes related to alignment input:
* 11 - no prefix ``trg:`` found for an argument to ``--fasta``
* 12 - a given alignment file does not exist
* 13 - never raised (parameter for checking gzip files)
* 14 - empty target name found (``trg:``)
* 15 - found an empty alignment file
* 16 - alignment with more than 2 sequences found
* 17 - target sequence name not found in alignment
* 18 - sequences in the alignment have different length
* 19 - problem with a JSON formatted file handed over to ``--json``
* 20 - JSON file could not be decoded into a JSON object
* 21 - JSON object has no 'alignmentlist' key
* 22 - JSON object has no 'target'/ 'template' in the 'alignmentlist'
* 23 - JSON string could not be decoded
* 24 - JSON object 'alignmentlist' does not point to a list
* 25 - JSON object 'alignmentlist' member is not a dictionary
* 26 - JSON object 'alignmentlist' 'target'/ 'template' does not point
to a dictionary
* 27 - JSON object 'alignmentlist' 'target'/ 'template' does not have
a needed key
* 28 - JSON object 'alignmentlist' 'target'/ 'template' has a value of
wrong type
Attributes added to the namespace returned by
:meth:`Parse`:
* :attr:`fasta` - filled with the input of the ``--fasta`` option, a
:class:`list` with multiple :class:`list` objects
* :attr:`json` - argument of the ``--json`` option, :class:`str`. May
be a filename of a JSON object string.
* :attr:`alignments` - :class:`ost.AlignmentList`, same order as
:attr:`fasta`, likely to **not** follow the order
of JSON input; first sequence of the alignment is
the target sequence, if in doubt, check for
sequence roles ``TARGET`` or ``TEMPLATE``
* :attr:`aln_sources` - the original source of the alignment, may be
filename(s) or a string in JSON format,
:class:`list` of all sources
"""
self.activate['ALIGNMENT'] = 1
def _AssembleAlignment(self):
"""
Actually add alignment arguments/ options
"""
aln_grp = self.add_mutually_exclusive_group(required=True)
# FastA input: - always pairwise alignments
# - callable multiple times
# - goes by 'trg:<SEQNAME> <FILE>'
# - excludes JSON file/ object
# - leading whitespaces of FastA headers will be deleted
aln_grp.add_argument('-f', '--fasta', nargs=2, action='append',
metavar=('trg:<NAME>', '<FILE>'),
help='Pairwise alignment in FastA format, needs '+
'to declare what is the target sequence.')
# JSON input: - right as string or file
# - object starts with {, so not allowed for files
# - callable only ONCE: should include everything needed
# - fields/ objects used: ...
# - goes by '--json <OBJECT>|<FILE>'
# - excludes '--fasta'
# - leading whitespaces of sequence names will be deleted
aln_grp.add_argument('-j', '--json', metavar='<OBJECT>|<FILE>',
help='Alignments provided as JSON file/ object.',
action=PM3StoreOnceAction)
class PM3OptionsNamespace(object):
# class will grow, so for the moment pylint is ignored
#pylint: disable=too-few-public-methods
"""
This one is mainly for internal use. You can use it like everything that
comes out of :meth:`argparse.ArgumentParser.parse_args`. Attributes are
added regarding how you assembled your argument parser.
"""
def __init__(self):
pass
def PostProcess(self, activated):
"""
Post processing of activated option packs.
"""
if 'ALIGNMENT' in activated:
self._PostProcessAlignment()
def _PostProcessAlignment(self):
#pylint: disable=no-member
#pylint: disable=attribute-defined-outside-init
"""
Doing some extra work after parsing.
"""
self.aln_sources = list()
self.alignments = seq.AlignmentList()
if self.fasta:
for src in self.fasta:
seqfile, new_aln = _FetchAlnFromFastaOpt(src)
self.alignments.append(new_aln)
self.aln_sources.append(seqfile)
return
# Now for JSON input. Since one of the options needs to be given and
# we already checked for FastA, no need to open a new branch, here.
# decide if file or object
json_obj = _GetJSONOBject(self.json)
for aln in _GetAlnFromJSON(json_obj, self.json):
self.alignments.append(aln)
self.aln_sources.append(self.json)
# LocalWords: param attr prog argparse ArgumentParser bool sys os init str
# LocalWords: progattr descattr argpinit argv formatter meth args namespace
# LocalWords: ArgumentDefaultsHelpFormatter sysargv AssembleParser fasta io
# LocalWords: metavar trg tpl FastA gzip tempfile ost promod aln stderr src
# LocalWords: AssembleTrgTplAln CreateSequence SetSequenceOffset LogError
# LocalWords: LogScript OptionsNamespace PostProcess AssembleAlignment JSON
# LocalWords: AddAlignment AlignmentList SEQNAME whitespaces nargs trgname
# LocalWords: PostProcessAlignment startswith seqfile elif MsgErrorAndExit
# LocalWords: len FileExists gz FileGzip readfile fh NamedTemporaryFile fas
# LocalWords: LoadAlignment exc GetCount fst GetSequence snd