Skip to content
Snippets Groups Projects
Select Git revision
  • 8e3e2a56654d457e7c5c80b793832c57c0343f77
  • master default protected
  • develop protected
  • cmake_boost_refactor
  • ubuntu_ci
  • mmtf
  • non-orthogonal-maps
  • no_boost_filesystem
  • data_viewer
  • 2.11.1
  • 2.11.0
  • 2.10.0
  • 2.9.3
  • 2.9.2
  • 2.9.1
  • 2.9.0
  • 2.8.0
  • 2.7.0
  • 2.6.1
  • 2.6.0
  • 2.6.0-rc4
  • 2.6.0-rc3
  • 2.6.0-rc2
  • 2.6.0-rc
  • 2.5.0
  • 2.5.0-rc2
  • 2.5.0-rc
  • 2.4.0
  • 2.4.0-rc2
29 results

blast.py

Blame
  • blast.py 9.18 KiB
    from ost import settings
    import subprocess
    from xml.dom import minidom
    from ost import io, seq
    import ost
    import os, re, sys
    
    class AlignedPatch:
      """
      An aligned patch, aka. HSP
    
      .. attribute:: aln
    
        The local alignment. Sequence offset of both sequences in the alignment are
        set to the starting position in the query and target sequence, respectively.
    
        :type: :class:`~ost.seq.AlignmentHandle`
    
      .. attribute:: bit_score
    
        The bit score of the HSP
    
      .. attribute:: score
    
        The score of the HSP
    
      .. attribute:: evalue
    
        The E-value of the HSP
    
      .. attribute:: seqid
    
        The sequence identity of the HSP
      """
      def __init__(self, aln, bit_score, score, evalue, seqid):
        self.aln=aln
        self.bit_score=bit_score
        self.score=score
        self.evalue=evalue
        self.seqid=seqid
    
    class BlastHit:
      """
      A positive match found by BLAST.
      
      Each blast hit consists of one or more HSPs, encoded by the
      :class:`AlignedPatch` class.
      
      .. attribute:: identifier
    
        The identifier of the matching sequence
    
      .. attribute:: aligned_patches
    
        list of :class:`AlignedPatch` instances holding the actual HSPs.
      """
      def __init__(self, identifier, aligned_patches):
        self.identifier=identifier
        self.aligned_patches=aligned_patches
    
    
    def ParseBlastOutput(string, seqid_thres=0, evalue_thres=float("infinity")):
      """
      Parses the blast output and returns a list of BlastHits
      setting no seqid_thres or evalue_thres, restores default behaviour without filtering
      """
      def _GetText(node):
        rc=''
        for child in node.childNodes:
            if child.nodeType==child.TEXT_NODE:
                rc+=child.data
        return rc
    
      def _GetValue(node, tag_name):
        tag=node.getElementsByTagName(tag_name)
        assert len(tag)==1
        return _GetText(tag[0])
    
      def _GetInt(node, tag_name):
        return int(_GetValue(node, tag_name))
    
      def _ParseHsp(query_id, hit_id, hsp, tot_query_len, seqid_thres=0, evalue_thres=float("infinity")):
        bit_score=float(_GetValue(hsp, 'Hsp_bit-score'))
        score=float(_GetValue(hsp, 'Hsp_score'))
        evalue=float(_GetValue(hsp, 'Hsp_evalue'))
        try:
          identity=float(_GetValue(hsp, 'Hsp_identity'))
        except AssertionError:
          # The Hsp_identity tag is not a 'must' in the BLAST XML format. It
          # describes the number of matching characters. Hence we assume, if it is
          # missing, there are 0 matches.
          identity=0
        hsp_align_len=float(_GetValue(hsp, 'Hsp_align-len'))
        seqid=identity/hsp_align_len
        query_offset=_GetInt(hsp, 'Hsp_query-from')-1
        hit_offset=_GetInt(hsp, 'Hsp_hit-from')-1
        query_seq=seq.CreateSequence(str(query_id), str(_GetValue(hsp, 'Hsp_qseq')))
        query_seq.offset=query_offset
        hit_seq=seq.CreateSequence(str(hit_id), str(_GetValue(hsp, 'Hsp_hseq')))
        hit_seq.offset=hit_offset
        try:
          if seqid > float(seqid_thres) and evalue < evalue_thres:
            aln=seq.CreateAlignment(query_seq, hit_seq)
            return AlignedPatch(aln, bit_score, score, evalue, seqid)
    
        except Exception, e:
          print str(e), query_seq, hit_seq
    
      try:
        doc=minidom.parseString(string)
      except Exception, e:
        ost.LogError('Error while parsing BLAST output: %s' % str(e))
        return None
      hits=[]
      query_id=_GetValue(doc, 'BlastOutput_query-def')
      tot_query_len=_GetValue(doc, 'BlastOutput_query-len')
      for hit in doc.getElementsByTagName('Hit'):
        hit_id=_GetValue(hit, 'Hit_def')
        hsps=hit.getElementsByTagName('Hsp')
        aligned_patches=[_ParseHsp(query_id, hit_id, hsp, tot_query_len, seqid_thres, evalue_thres) for hsp in hsps]
        hits.append(BlastHit(hit_id, aligned_patches))
      return hits
    
    
    
    class BlastError(RuntimeError):
      def __init__(self, brief, details):
        self.brief=brief
        self.details=details
    
      def __str__(self):
        if self.details:
          return '%s\n%s' % (self.brief, self.details)
        else:
          return self.brief
    
    def CreateDB(infasta, dbout, mkdb_cmd=None):
      """
      Create a blast DB from a fasta file
    
      :param infasta: the pdb fasta from which the database will be created
      :type infasta: :class:`string`
    
      :param dbout: output location for blastDB file
      :type dbout: :class:`string`
    
    
      """
      if mkdb_cmd==None:
        try:
          exe=settings.Locate('formatdb')
          args=[exe, '-i', infasta, '-v',str(10000),'-n',dbout]
        except:
          try:
            exe=settings.Locate('makeblastdb')
            args=[exe, '-in', infasta, ' -max_file_sz 10GB -out', dbout,'-dbtype','prot']
            
          except:
            raise RuntimeError('could not find makeblastdb/formatdb executable')
      else:
        if os.path.basename(mkdb_cmd)=='makeblastdb':
          exe=settings.Locate('makeblastdb',explicit_file_name=mkdb_cmd)
          args=[exe, '-in', infasta, ' -max_file_sz 10GB -out', dbout,'-dbtype','prot']
        elif os.path.basename(mkdb_cmd)=='formatdb':
            exe=settings.Locate('formatdb',explicit_filename=mkdb_cmd)
            args=[exe, '-i', infasta, '-v',str(10000),'-n',dbout]
        else:
          raise IOError('mkdb command must either be the path to formatdb or makeblastdb!')
    
      cmd=' '.join(args)
      ost.LogInfo('creating blast DB (%s)' % cmd)
      os.system(cmd)
    
    def BlastVersion(blast_location=None):
      """
      Returns the version of the BLAST executable, e.g. 2.2.24 as a string
      """
    
      try:
        blast_exe=settings.Locate('blastall',explicit_file_name=blast_location)
      except:
        try:
          blast_exe=settings.Locate('blastp', explicit_file_name=blast_location)
        except:
          raise RuntimeError('could not find blast executable')
    
      if os.path.basename(blast_exe)=='blastall':
        args=[blast_exe]
        pattern=re.compile(r'blastall (\d+\.\d+\.\d+)\s+arguments:\s*')
    
      else:
        args=[blast_exe, '-version']
        pattern=re.compile(r'Package: blast (\d+\.\d+\.\d+),\s+')
    
      blast_pipe=subprocess.Popen(args, stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE)
      lines=blast_pipe.stdout.readlines()
    
      for line in lines:
        m=pattern.match(line)
        if m:
          return m.group(1)
      raise IOError("could not determine blast version for '%s'" % blast_exe)
    
    
    
    def Blast(query, database, gap_open=11, gap_ext=1, matrix='BLOSUM62',
             blast_location=None, outfmt=0, filter_low_complexity=True):
      """
      Runs a protein vs. protein blast search. The results are returned as a
      list of :class:`BlastHit` instances.
    
      :param query: the query sequence
      :type query: :class:`seq.ConstSequenceHandle`
    
      :param database: The filename of the sequence database. Make sure that
          formatdb has been run on the database and the <database>.pin file exists.
      :param matrix: The substitution matrix to be used. Must be one of 'BLOSUM45',
         'BLOSUM62', 'BLOSUM80', 'PAM30', 'PAM70'.
      :param gap_open: Gap opening penalty. Note that only a subset of gap opening
         penalties is supported for each substitutition matrix. Consult the blast
         docs for more information.
      :param gap_ext: Gap extension penalty. Only a subset of gap extension
         penalties are supported for each of the substitution matrices. Consult the
         blast docs for more information.
      :param outfmt: output format, where '0' corresponds to default output (parsed blast output and 1 to raw output)
      :param filter_low_complexity: Mask off segments of the query sequence that 
         have low compositional complexity, as determined by the SEG program of 
         Wootton & Federhen (Computers and Chemistry, 1993)
      """
      subst_mats=('BLOSUM45', 'BLOSUM62', 'BLOSUM80', 'PAM30', 'PAM70',)
      if matrix not in subst_mats:
        raise ValueError('matrix must be one of %s' % ', '.join(subst_mats))
      if not os.path.exists('%s.pin' % database) and not os.path.exists('%s.pal' % database):
        raise IOError("Database %s does not exist" % database)
      if blast_location!=None and not os.path.exists(blast_location):
        ost.LogScript('Could not find %s' %blast_location)
    
      if blast_location==None:
        try:
          blast_exe=settings.Locate('blastall')
        except:
          try:
            blast_exe=settings.Locate('blastp')
          except:
            raise RuntimeError('could not find blast executable')
      else:
        blast_exe=settings.Locate(os.path.basename(blast_location),explicit_file_name=blast_location)
    
      if os.path.basename(blast_exe)=='blastall':
        args=[blast_exe, '-d', database, '-p', 'blastp',
              '-m', '7', '-M', matrix, '-G', str(gap_open), '-E', str(gap_ext)]
        if filter_low_complexity==False:
          args.append('-F')
          args.append('F')
              
      else:
        complexity_opt='-seg'
        if filter_low_complexity==True:
          complexity_arg='yes'
        else:
          complexity_arg='no'
        args=[blast_exe, '-db', database, '-matrix', matrix,
              '-gapopen', str(gap_open), '-gapextend', str(gap_ext), '-outfmt', '5', complexity_opt, complexity_arg ]
    
      ost.LogInfo('running BLAST (%s)' % ' '.join(args))
      blast_pipe=subprocess.Popen(args, stderr=subprocess.PIPE,
                                  stdout=subprocess.PIPE, stdin=subprocess.PIPE)
      if isinstance(query, str):
        stdout, stderr=blast_pipe.communicate(query)
      else:
        stdout, stderr=blast_pipe.communicate(io.SequenceToString(query, 'fasta'))
    
      if len(stderr)>0:
         pattern=re.compile(r'^\[.*\]\s+ERROR:\s+(.*)')
         lines=stderr.split('\n')
         error_message=pattern.match(lines[0])
         if error_message:
           raise BlastError(error_message.group(1), '\n'.join(lines[1:]))
      if outfmt==0:
        return ParseBlastOutput(stdout)
      else:
        return stdout