Skip to content
Snippets Groups Projects
Commit 1bb53ff2 authored by Gerardo Tauriello's avatar Gerardo Tauriello
Browse files

Added phytoplasma-effectors project

parent 08cb3680
No related branches found
No related tags found
No related merge requests found
# Modelling of phytoplasma effectors
[Link to project in ModelArchive](https://modelarchive.org/doi/10.5452/ma-saps) (incl. background on project itself)
Setup:
- Using AlphaFold v2.2 for monomer predictions with fairly default settings
- Conversion based only on PDB files without extra data
- CSV file with links to accession codes in UniProt
Special features here:
- Generic code added to find best sequence match in UniProt history and report mismatches
- Sequences with remaining mismatches (here due to experimental assays) labelled in entity name
- One obsolete entry (SAP11_MBS linked to A0A1B3JKP4 in UniProt or UPI00083DE1DE in UniParc)
- pLDDT extracted from b-factors (simplest setup since no other QA scores anyway)
Content:
- translate2modelcif.py : script to do conversion
- input_data: PDB files and CSV used for conversion
Accession,Name
Q2NJQ2,SAP54
A0A0P7KHL3,SAP54_Peanut
A0A4Y5N0H8,SAP54_RP
Q2NJD7,SAP49
Q2NJN9,SAP41
Q2NJB0,SAP66
A0A410HXL4,SAP11_WBDL
Q2NK94,SAP05
Q2NJA6,SAP11
Q2NII6,SAP13
Q2NKA4,SAP21
Q2NJZ9,SAP27
Q2NIN8,SAP35
Q2NJL8,SAP42
Q2NJI2,SAP45
Q2NJA8,SAP67
Q2NJA7,SAP68
Q2NK93,SAP06
Q2NJJ6,SAP44
Q2NJD6,SAP48
A0A1B3JKP4,SAP11_MBS
File added
#! /usr/local/bin/ost
# -*- coding: utf-8 -*-
"""Translate models for Miguel from PDB + extra data into ModelCIF."""
# EXAMPLES for running:
"""
ost translate2modelcif.py ./structures ./accessions.csv ./modelcif \
--compress > script_out.txt
"""
import argparse
import datetime
import gzip
import os
import shutil
import sys
import zipfile
import pickle
import filecmp
import re
from timeit import default_timer as timer
import numpy as np
import requests
import ujson as json
import gemmi
import pandas as pd
import ihm
import ihm.citations
import modelcif
import modelcif.associated
import modelcif.dumper
import modelcif.model
import modelcif.protocol
import modelcif.reference
from ost import io, seq
def _parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__,
)
parser.add_argument(
"model_dir",
type=str,
metavar="<MODEL DIR>",
help="Directory with PDB files to be translated.",
)
parser.add_argument(
"metadata_file",
type=str,
metavar="<METADATA FILE>",
help="Path to CSV file with metadata.",
)
parser.add_argument(
"out_dir",
type=str,
metavar="<OUTPUT DIR>",
help="Path to directory to store results.",
)
parser.add_argument(
"--compress",
default=False,
action="store_true",
help="Compress ModelCIF file with gzip.",
)
opts = parser.parse_args()
# check that model dir exists
if opts.model_dir.endswith("/"):
opts.model_dir = opts.model_dir[:-1]
if not os.path.exists(opts.model_dir):
_abort_msg(f"Model directory '{opts.model_dir}' does not exist.")
if not os.path.isdir(opts.model_dir):
_abort_msg(f"Path '{opts.model_dir}' does not point to a directory.")
# check metadata_file
if not os.path.exists(opts.metadata_file):
_abort_msg(f"Metadata file '{opts.metadata_file}' does not exist.")
if not os.path.isfile(opts.metadata_file):
_abort_msg(f"Path '{opts.metadata_file}' does not point to a file.")
# check out_dir
if opts.out_dir.endswith("/"):
opts.out_dir = opts.out_dir[:-1]
if not os.path.exists(opts.out_dir):
os.makedirs(opts.out_dir)
if not os.path.isdir(opts.out_dir):
_abort_msg(f"Path '{opts.out_dir}' does not point to a directory.")
return opts
# pylint: disable=too-few-public-methods
class _GlobalPLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT):
"""Predicted accuracy according to the CA-only lDDT in [0,100]"""
name = "pLDDT"
software = None
class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT):
"""Predicted accuracy according to the CA-only lDDT in [0,100]"""
name = "pLDDT"
software = None
# pylint: enable=too-few-public-methods
def _get_res_num(r, use_auth=False):
"""Get res. num. from auth. IDs if reading from mmCIF files."""
if use_auth:
return int(r.GetStringProp("pdb_auth_resnum"))
else:
return r.number.num
def _get_ch_name(ch, use_auth=False):
"""Get chain name from auth. IDs if reading from mmCIF files."""
if use_auth:
return ch.GetStringProp("pdb_auth_chain_name")
else:
return ch.name
class _OST2ModelCIF(modelcif.model.AbInitioModel):
"""Map OST entity elements to ihm.model"""
def __init__(self, *args, **kwargs):
"""Initialise a model"""
self.ost_entity = kwargs.pop("ost_entity")
self.asym = kwargs.pop("asym")
# use auth IDs for res. nums and chain names
self.use_auth = False
# fetch plddts per residue
self.plddts = []
for res in self.ost_entity.residues:
b_factors = [a.b_factor for a in res.atoms]
assert len(set(b_factors)) == 1 # must all be equal!
self.plddts.append(b_factors[0])
super().__init__(*args, **kwargs)
def get_atoms(self):
# ToDo [internal]: Take B-factor out since its not a B-factor?
# NOTE: this assumes that _get_res_num maps residue to pos. in seqres
# within asym
for atm in self.ost_entity.atoms:
yield modelcif.model.Atom(
asym_unit=self.asym[_get_ch_name(atm.chain, self.use_auth)],
seq_id=_get_res_num(atm.residue, self.use_auth),
atom_id=atm.name,
type_symbol=atm.element,
x=atm.pos[0],
y=atm.pos[1],
z=atm.pos[2],
het=atm.is_hetatom,
biso=atm.b_factor,
occupancy=atm.occupancy,
)
def add_scores(self):
"""Add QA metrics from AF2 scores."""
# global scores
self.qa_metrics.append(
_GlobalPLDDT(np.mean(self.plddts))
)
# local scores
i = 0
for chn_i in self.ost_entity.chains:
ch_name = _get_ch_name(chn_i, self.use_auth)
for res_i in chn_i.residues:
# local pLDDT
res_num = _get_res_num(res_i, self.use_auth)
self.qa_metrics.append(
_LocalPLDDT(
self.asym[ch_name].residue(res_num),
self.plddts[i],
)
)
i += 1
def _abort_msg(msg, exit_code=1):
"""Write error message and exit with exit_code."""
print(f"{msg}\nAborting.", file=sys.stderr)
sys.exit(exit_code)
def _warn_msg(msg):
"""Write a warning message to stdout."""
print(f"WARNING: {msg}")
def _check_file(file_path):
"""Make sure a file exists and is actually a file."""
if not os.path.exists(file_path):
_abort_msg(f"File not found: '{file_path}'.")
if not os.path.isfile(file_path):
_abort_msg(f"File path does not point to file: '{file_path}'.")
def _get_audit_authors():
"""Return the list of authors that produced this model."""
return (
"Correa Marrero, Miguel",
"Capdevielle, Sylvain",
"Huang, Weijie",
"Al-Subhi, Ali M.",
"Busscher, Marco",
"Busscher-Lange, Jacqueline",
"van der Wal, Froukje",
"de Ridder, Dick",
"van Dijk, Aalt D.J.",
"Hogenhout, Saskia A.",
"Immink, Richard",
)
def _get_metadata(metadata_file):
"""Read csv file with metedata and prepare for next steps."""
metadata = pd.read_csv(metadata_file)
# make sure names (== PDB file names) are unique
assert len(set(metadata.Name)) == metadata.shape[0]
# columns: [Accession, Name]
return metadata
def _get_config():
"""Define AF setup."""
description = "Model generated using AlphaFold (v2.2.0) producing 5 " \
"monomer models with 3 recycles each, without model " \
"relaxation, using templates (up to Aug. 4 2022), ranked " \
"by pLDDT, starting from an MSA with reduced_dbs setting."
af_config = {
"model_preset": "monomer",
"db_preset": "reduced_dbs",
"max_template_date": "2022-08-04",
"run_relax": False,
}
return {
"af_config": af_config,
"af_version": "2.2.0",
"description": description,
"use_templates": True,
"use_small_bfd": True,
"use_multimer": False,
}
def _get_protocol_steps_and_software(config_data):
"""Create the list of protocol steps with software and parameters used."""
protocol = []
# modelling step
step = {
"method_type": "modeling",
"name": None,
"details": config_data["description"],
}
# get input data
# Must refer to data already in the JSON, so we try keywords
step["input"] = "target_sequences"
# get output data
# Must refer to existing data, so we try keywords
step["output"] = "model"
# get software
if config_data["use_multimer"]:
step["software"] = [{
"name": "AlphaFold-Multimer",
"classification": "model building",
"description": "Structure prediction",
"citation": ihm.Citation(
pmid=None,
title="Protein complex prediction with "
+ "AlphaFold-Multimer.",
journal="bioRxiv",
volume=None,
page_range=None,
year=2021,
authors=[
"Evans, R.",
"O'Neill, M.",
"Pritzel, A.",
"Antropova, N.",
"Senior, A.",
"Green, T.",
"Zidek, A.",
"Bates, R.",
"Blackwell, S.",
"Yim, J.",
"Ronneberger, O.",
"Bodenstein, S.",
"Zielinski, M.",
"Bridgland, A.",
"Potapenko, A.",
"Cowie, A.",
"Tunyasuvunakool, K.",
"Jain, R.",
"Clancy, E.",
"Kohli, P.",
"Jumper, J.",
"Hassabis, D.",
],
doi="10.1101/2021.10.04.463034",
),
"location": "https://github.com/deepmind/alphafold",
"type": "package",
"version": config_data["af_version"],
}]
else:
step["software"] = [{
"name": "AlphaFold",
"classification": "model building",
"description": "Structure prediction",
"citation": ihm.citations.alphafold2,
"location": "https://github.com/deepmind/alphafold",
"type": "package",
"version": config_data["af_version"],
}]
step["software_parameters"] = config_data["af_config"]
protocol.append(step)
return protocol
def _get_title(prot_name):
"""Get a title for this modelling experiment."""
return f"AlphaFold2 model of Candidatus Phytoplasma ({prot_name})"
def _get_model_details(prot_name):
"""Get the model description."""
return f"The AlphaFold2 model of Candidatus Phytoplasma ({prot_name}) is " \
f"part of a larger structural dataset. The complete dataset " \
f"comprises AlphaFold2 models of 21 different phytoplasma " \
f"effectors studied in a protein-protein interaction assay."
def _get_model_group_name():
"""Get a name for a model group."""
return None
def _get_sequence(chn, use_auth=False):
"""Get the sequence out of an OST chain incl. '-' for gaps in resnums."""
# initialise (add gaps if first is not at num. 1)
lst_rn = _get_res_num(chn.residues[0], use_auth)
idx = 1
sqe = "-" * (lst_rn - 1) \
+ chn.residues[0].one_letter_code
for res in chn.residues[idx:]:
lst_rn += 1
while lst_rn != _get_res_num(res, use_auth):
sqe += "-"
lst_rn += 1
sqe += res.one_letter_code
return sqe
def _check_sequence(up_ac, sequence):
"""Verify sequence to only contain standard olc."""
ns_aa_pos = [] # positions of non-standard amino acids
for i, res in enumerate(sequence):
if res not in "ACDEFGHIKLMNPQRSTVWY":
if res == "U":
_warn_msg(
f"Selenocysteine found at position {i+1} of entry "
+ f"'{up_ac}', this residue may be missing in the "
+ "model."
)
ns_aa_pos.append(i)
continue
raise RuntimeError(
"Non-standard aa found in UniProtKB sequence "
+ f"for entry '{up_ac}': {res}, position {i+1}"
)
return ns_aa_pos
def _get_n_parse_up_entry(up_ac, up_url):
"""Get data for an UniProtKB entry and parse it."""
# This is a simple parser for UniProtKB txt format, instead of breaking it
# up into multiple functions, we just allow many many branches & statements,
# here.
# pylint: disable=too-many-branches,too-many-statements
data = {}
data["up_organism"] = ""
data["up_sequence"] = ""
data["up_ac"] = up_ac
rspns = requests.get(up_url, timeout=180)
for line in rspns.iter_lines(decode_unicode=True):
if line.startswith("ID "):
sline = line.split()
if len(sline) != 5:
raise RuntimeError(f"Unusual UniProtKB ID line found:\n" \
f"'{line}'")
data["up_id"] = sline[1]
elif line.startswith("OX NCBI_TaxID="):
# Following strictly the UniProtKB format: 'OX NCBI_TaxID=<ID>;'
data["up_ncbi_taxid"] = line[len("OX NCBI_TaxID=") : -1]
data["up_ncbi_taxid"] = data["up_ncbi_taxid"].split("{")[0].strip()
elif line.startswith("OS "):
if line[-1] == ".":
data["up_organism"] += line[len("OS ") : -1]
else:
data["up_organism"] += line[len("OS ") : -1] + " "
elif line.startswith("SQ "):
sline = line.split()
if len(sline) != 8:
raise RuntimeError(f"Unusual UniProtKB SQ line found:\n" \
f"'{line}'")
data["up_seqlen"] = int(sline[2])
data["up_crc64"] = sline[6]
elif line.startswith(" "):
sline = line.split()
if len(sline) > 6:
raise RuntimeError(
"Unusual UniProtKB sequence data line "
+ f"found:\n'{line}'"
)
data["up_sequence"] += "".join(sline)
elif line.startswith("RP "):
if "ISOFORM" in line.upper():
raise RuntimeError(
f"First ISOFORM found for '{up_ac}', needs handling."
)
elif line.startswith("DT "):
# 2012-10-03
dt_flds = line[len("DT ") :].split(", ")
if dt_flds[1].upper().startswith("SEQUENCE VERSION "):
data["up_last_mod"] = datetime.datetime.strptime(
dt_flds[0], "%d-%b-%Y"
)
elif dt_flds[1].upper().startswith("ENTRY VERSION "):
data["up_entry_version"] = dt_flds[1][len("ENTRY VERSION ") :]
if data["up_entry_version"][-1] == ".":
data["up_entry_version"] = data["up_entry_version"][:-1]
data["up_entry_version"] = int(data["up_entry_version"])
elif line.startswith("GN Name="):
data["up_gn"] = line[len("GN Name=") :].split(";")[0]
data["up_gn"] = data["up_gn"].split("{")[0].strip()
# we have not seen isoforms in the data set, yet, so we just set them to '.'
data["up_isoform"] = None
# NOTE: no gene names in this set (use provided names instead)
# if "up_gn" not in data:
# _warn_msg(
# f"No gene name found for UniProtKB entry '{up_ac}', using "
# + "UniProtKB AC instead."
# )
# data["up_gn"] = up_ac
if "up_last_mod" not in data:
raise RuntimeError(f"No sequence version found for UniProtKB entry " \
f"'{up_ac}'.")
if "up_crc64" not in data:
raise RuntimeError(f"No CRC64 value found for UniProtKB entry " \
f"'{up_ac}'.")
if len(data["up_sequence"]) == 0:
raise RuntimeError(f"No sequence found for UniProtKB entry '{up_ac}'.")
# check that sequence length and CRC64 is correct
if data["up_seqlen"] != len(data["up_sequence"]):
raise RuntimeError(
"Sequence length of SQ line and sequence data differ for "
+ f"UniProtKB entry '{up_ac}': {data['up_seqlen']} != "
+ f"{len(data['up_sequence'])}"
)
data["up_ns_aa"] = _check_sequence(data["up_ac"], data["up_sequence"])
if "up_id" not in data:
raise RuntimeError(f"No ID found for UniProtKB entry '{up_ac}'.")
if "up_ncbi_taxid" not in data:
raise RuntimeError(f"No NCBI taxonomy ID found for UniProtKB entry " \
f"'{up_ac}'.")
if len(data["up_organism"]) == 0:
raise RuntimeError(f"No organism species found for UniProtKB entry " \
f"'{up_ac}'.")
return data
def _fetch_upkb_entry(up_ac):
"""Get an UniProtKB entry."""
return _get_n_parse_up_entry(
up_ac, f"https://rest.uniprot.org/uniprotkb/{up_ac}.txt"
)
def _fetch_unisave_entry(up_ac, version):
"""Get an UniSave entry, in contrast to an UniProtKB entry, that allows us
to specify a version."""
return _get_n_parse_up_entry(
up_ac,
f"https://rest.uniprot.org/unisave/{up_ac}?format=txt&"
+ f"versions={version}",
)
def _cmp_sequences(mdl, upkb, ns_aa_pos, deletion_mismatches=True):
"""Compare sequence while paying attention on non-standard amino acids.
Returns list of mismatches (up_pos, olc_up, olc_mdl) and covered UP range.
UniProt positions and ranges are 1-indexed.
Negative "up_pos" relates to (1-indexed) pos. in model for added residues.
If deletion_mismatches is True, res. in UP seq. but not in mdl within UP
range are counted as mismatches (N/C-terminal ones are never counted).
"""
# We add a U to the sequence when necessary. AF2 does not model it. The PDB
# has selenocysteine as canonical aa, see PDB entry 7Z0T.
for pos in ns_aa_pos:
if mdl[pos] != "-":
_abort_msg(
f"Position {pos+1} of non-canonical amino acid should be "
"a gap!"
)
mdl = mdl[0:pos] + "U" + mdl[pos + 1 :]
if mdl == upkb:
mismatches = []
up_range = (1, len(mdl))
else:
# align and report mismatches
up_seq = seq.CreateSequence("UP", upkb)
ch_seq = seq.CreateSequence("CH", mdl)
aln = seq.alg.SemiGlobalAlign(up_seq, ch_seq, seq.alg.BLOSUM62)[0]
# get range and mismatches
aligned_indices = [i for i, c in enumerate(aln) \
if c[0] != '-' and c[1] != '-']
up_range = (
aln.GetResidueIndex(0, aligned_indices[0]) + 1,
aln.GetResidueIndex(0, aligned_indices[-1]) + 1,
)
mismatches = []
for idx, (olc_up, olc_mdl) in enumerate(aln):
if olc_up != olc_mdl:
# mismatches are either extra res. in mdl/UP or mismatch
if olc_up == '-':
up_pos = -(aln.GetResidueIndex(1, idx) + 1)
else:
up_pos = aln.GetResidueIndex(0, idx) + 1
# ignore if out of UP range
if up_pos < up_range[0] or up_pos > up_range[1]:
continue
# optionally ignore extra res. in UP also otherwise
if not deletion_mismatches and olc_mdl == '-':
continue
mismatches.append((up_pos, olc_up, olc_mdl))
return mismatches, up_range
def _get_upkb_for_sequence(sqe, up_ac, up_version=None):
"""Get UniProtKB entry data for given sequence.
If up_version given, we start from historical data in unisave.
Returns best possible hit (i.e. fewest mismatches between sqe and UP seq.)
as dict. with parsed UP data (see _get_n_parse_up_entry) with range covered
and mismatches (see _cmp_sequences) added as "up_range" and "mismatches".
"""
if up_version is None:
up_data = _fetch_upkb_entry(up_ac)
else:
up_data = _fetch_unisave_entry(up_ac, up_version)
min_up_data = None
while True:
mismatches, up_range = _cmp_sequences(sqe, up_data["up_sequence"],
up_data["up_ns_aa"])
if min_up_data is None or \
len(mismatches) < len(min_up_data["mismatches"]):
min_up_data = up_data
min_up_data["mismatches"] = mismatches
min_up_data["up_range"] = up_range
if len(mismatches) == 0:
# found hit; done
break
# fetch next one (skip if exceptions happen)
next_v = up_data["up_entry_version"] - 1
while next_v > 0:
try:
# note: can fail to parse very old UP versions...
up_data = _fetch_unisave_entry(up_ac, next_v)
# can move on if no exception happened
break
except RuntimeError as ex:
#_warn_msg(f"Error in parsing v{next_v} of {up_ac}:\n{ex}")
# try next one
next_v -= 1
if next_v == 0:
# warn user about failure to find match and abort
min_mismatches = min_up_data["mismatches"]
msg = f"Sequences not equal from file: {sqe}, from UniProtKB: " \
f"{min_up_data['up_sequence']} ({up_ac}), checked entire " \
f"entry history and best match had following mismatches " \
f"in v{min_up_data['up_entry_version']} (range " \
f"{min_up_data['up_range']}): {min_up_data['mismatches']}."
_warn_msg(msg)
# raise RuntimeError(msg)
break
return min_up_data
def _get_entities(pdb_file, up_ac, prot_name):
"""Gather data for the mmCIF (target) entities."""
_check_file(pdb_file)
ost_ent = io.LoadPDB(pdb_file)
if ost_ent.chain_count != 1:
raise RuntimeError(
f"Unexpected oligomer in {pdb_file}"
)
chn = ost_ent.chains[0]
sqe_no_gaps = "".join([res.one_letter_code for res in chn.residues])
sqe_gaps = _get_sequence(chn)
if sqe_no_gaps != sqe_gaps:
raise RuntimeError(f"Sequence in {pdb_file} has gaps for chain " \
f"{chn.name}")
# map to entities
# special case: A0A1B3JKP4 is obsolete (there may be better ways to catch that)
if up_ac == "A0A1B3JKP4":
up_version = 10
else:
up_version = None
upkb = _get_upkb_for_sequence(sqe_no_gaps, up_ac, up_version)
description = f"Candidatus Phytoplasma ({prot_name})"
if len(upkb["mismatches"]) != 0:
description += " (sequence mismatches due to sequencing from experimental assay)"
cif_ent = {
"seqres": sqe_no_gaps,
"pdb_sequence": sqe_no_gaps,
"pdb_chain_id": [_get_ch_name(chn, False)],
"prot_name": prot_name,
"description": description
}
cif_ent.update(upkb)
return [cif_ent], ost_ent
def _get_modelcif_entities(target_ents, asym_units, system):
"""Create ModelCIF entities and asymmetric units."""
for cif_ent in target_ents:
mdlcif_ent = modelcif.Entity(
# NOTE: sequence here defines residues in model!
cif_ent["seqres"],
description=cif_ent["description"],
source=ihm.source.Natural(
ncbi_taxonomy_id=cif_ent["up_ncbi_taxid"],
scientific_name=cif_ent["up_organism"],
),
references=[
modelcif.reference.UniProt(
cif_ent["up_id"],
cif_ent["up_ac"],
align_begin=cif_ent["up_range"][0],
align_end=cif_ent["up_range"][1],
isoform=cif_ent["up_isoform"],
ncbi_taxonomy_id=cif_ent["up_ncbi_taxid"],
organism_scientific=cif_ent["up_organism"],
sequence_version_date=cif_ent["up_last_mod"],
sequence_crc64=cif_ent["up_crc64"],
)
],
)
# NOTE: this assigns (potentially new) alphabetic chain names
for pdb_chain_id in cif_ent["pdb_chain_id"]:
asym_units[pdb_chain_id] = modelcif.AsymUnit(
mdlcif_ent, strand_id=pdb_chain_id,
)
system.target_entities.append(mdlcif_ent)
def _assemble_modelcif_software(soft_dict):
"""Create a modelcif.Software instance from dictionary."""
return modelcif.Software(
soft_dict["name"],
soft_dict["classification"],
soft_dict["description"],
soft_dict["location"],
soft_dict["type"],
soft_dict["version"],
citation=soft_dict["citation"],
)
def _get_sequence_dbs(config_data):
"""Get AF seq. DBs."""
# hard coded UniProt release (see https://www.uniprot.org/release-notes)
# (TO BE UPDATED FOR EVERY DEPOSITION!)
up_version = "2021_04"
up_rel_date = datetime.datetime(2021, 11, 17)
# fill list of DBs
seq_dbs = []
if config_data["use_small_bfd"]:
seq_dbs.append(modelcif.ReferenceDatabase(
"Reduced BFD",
"https://storage.googleapis.com/alphafold-databases/"
+ "reduced_dbs/bfd-first_non_consensus_sequences.fasta.gz"
))
else:
seq_dbs.append(modelcif.ReferenceDatabase(
"BFD",
"https://storage.googleapis.com/alphafold-databases/"
+ "casp14_versions/"
+ "bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt.tar.gz",
version="6a634dc6eb105c2e9b4cba7bbae93412",
))
if config_data["af_version"] < "2.3.0":
seq_dbs.append(modelcif.ReferenceDatabase(
"MGnify",
"https://storage.googleapis.com/alphafold-databases/"
+ "casp14_versions/mgy_clusters_2018_12.fa.gz",
version="2018_12",
release_date=datetime.datetime(2018, 12, 6),
))
seq_dbs.append(modelcif.ReferenceDatabase(
"Uniclust30",
"https://storage.googleapis.com/alphafold-databases/"
+ "casp14_versions/uniclust30_2018_08_hhsuite.tar.gz",
version="2018_08",
release_date=None,
))
else:
# NOTE: release date according to https://ftp.ebi.ac.uk/pub/databases/metagenomics/peptide_database/2022_05/
seq_dbs.append(modelcif.ReferenceDatabase(
"MGnify",
"https://storage.googleapis.com/alphafold-databases/"
+ "v2.3/mgy_clusters_2022_05.fa.gz",
version="2022_05",
release_date=datetime.datetime(2022, 5, 6),
))
seq_dbs.append(modelcif.ReferenceDatabase(
"UniRef30",
"https://storage.googleapis.com/alphafold-databases/"
+ "v2.3/UniRef30_2021_03.tar.gz",
version="2021_03",
release_date=None,
))
if config_data["use_multimer"]:
seq_dbs.append(modelcif.ReferenceDatabase(
"TrEMBL",
"ftp://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/"
+ "knowledgebase/complete/uniprot_trembl.fasta.gz",
version=up_version,
release_date=up_rel_date,
))
seq_dbs.append(modelcif.ReferenceDatabase(
"Swiss-Prot",
"ftp://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/"
+ "knowledgebase/complete/uniprot_sprot.fasta.gz",
version=up_version,
release_date=up_rel_date,
))
seq_dbs.append(modelcif.ReferenceDatabase(
"UniRef90",
"ftp://ftp.uniprot.org/pub/databases/uniprot/uniref/uniref90/"
+ "uniref90.fasta.gz",
version=up_version,
release_date=up_rel_date,
))
if config_data["use_templates"]:
seq_dbs.append(modelcif.ReferenceDatabase(
"PDB70",
"http://wwwuser.gwdg.de/~compbiol/data/hhsuite/databases/"
+ "hhsuite_dbs/old-releases/pdb70_from_mmcif_200401.tar.gz",
release_date=datetime.datetime(2020, 4, 1)
))
return seq_dbs
def _get_modelcif_protocol_software(js_step):
"""Assemble software entries for a ModelCIF protocol step."""
if js_step["software"]:
if len(js_step["software"]) == 1:
sftwre = _assemble_modelcif_software(js_step["software"][0])
else:
sftwre = []
for sft in js_step["software"]:
sftwre.append(_assemble_modelcif_software(sft))
sftwre = modelcif.SoftwareGroup(elements=sftwre)
if js_step["software_parameters"]:
params = []
for key, val in js_step["software_parameters"].items():
params.append(modelcif.SoftwareParameter(key, val))
if isinstance(sftwre, modelcif.SoftwareGroup):
sftwre.parameters = params
else:
sftwre = modelcif.SoftwareGroup(
elements=(sftwre,), parameters=params
)
return sftwre
return None
def _get_modelcif_protocol_input(js_step, target_entities, ref_dbs, model):
"""Assemble input data for a ModelCIF protocol step."""
if js_step["input"] == "target_sequences":
input_data = modelcif.data.DataGroup(target_entities)
input_data.extend(ref_dbs)
elif js_step["input"] == "model":
input_data = model
else:
raise RuntimeError(f"Unknown protocol input: '{js_step['input']}'")
return input_data
def _get_modelcif_protocol_output(js_step, model):
"""Assemble output data for a ModelCIF protocol step."""
if js_step["output"] == "model":
output_data = model
else:
raise RuntimeError(f"Unknown protocol output: '{js_step['output']}'")
return output_data
def _get_modelcif_protocol(protocol_steps, target_entities, model, ref_dbs):
"""Create the protocol for the ModelCIF file."""
protocol = modelcif.protocol.Protocol()
for js_step in protocol_steps:
sftwre = _get_modelcif_protocol_software(js_step)
input_data = _get_modelcif_protocol_input(
js_step, target_entities, ref_dbs, model
)
output_data = _get_modelcif_protocol_output(js_step, model)
protocol.steps.append(
modelcif.protocol.Step(
input_data=input_data,
output_data=output_data,
name=js_step["name"],
details=js_step["details"],
software=sftwre,
)
)
protocol.steps[-1].method_type = js_step["method_type"]
return protocol
def _compress_cif_file(cif_file):
"""Compress cif file and delete original."""
with open(cif_file, 'rb') as f_in:
with gzip.open(cif_file + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(cif_file)
def _store_as_modelcif(data_json, ost_ent, out_dir, mdl_name, compress):
"""Mix all the data into a ModelCIF file."""
print(" generating ModelCIF objects...", end="")
pstart = timer()
# create system to gather all the data
system = modelcif.System(
title=data_json["title"],
id=data_json["mdl_id"].upper(),
model_details=data_json["model_details"],
)
# create an asymmetric unit and an entity per target sequence
asym_units = {}
_get_modelcif_entities(
data_json["target_entities"], asym_units, system
)
# audit_authors
system.authors.extend(data_json["audit_authors"])
# set up the model to produce coordinates
model = _OST2ModelCIF(
assembly=modelcif.Assembly(asym_units.values()),
asym=asym_units,
ost_entity=ost_ent,
name="Top ranked model",
)
print(f" ({timer()-pstart:.2f}s)")
print(" processing QA scores...", end="", flush=True)
pstart = timer()
model.add_scores()
print(f" ({timer()-pstart:.2f}s)")
model_group = modelcif.model.ModelGroup(
[model], name=data_json["model_group_name"]
)
system.model_groups.append(model_group)
ref_dbs = _get_sequence_dbs(data_json["config_data"])
protocol = _get_modelcif_protocol(
data_json["protocol"], system.target_entities, model, ref_dbs
)
system.protocols.append(protocol)
# write modelcif System to file (NOTE: no PAE here!)
print(" write to disk...", end="", flush=True)
pstart = timer()
out_path = os.path.join(out_dir, f"{mdl_name}.cif")
with open(out_path, "w", encoding="ascii") as mmcif_fh:
modelcif.dumper.write(mmcif_fh, [system])
if compress:
_compress_cif_file(out_path)
print(f" ({timer()-pstart:.2f}s)")
def _translate2modelcif(up_ac, prot_name, opts):
"""Convert a model with its accompanying data to ModelCIF."""
mdl_id = prot_name
# skip if done already (disabled here due to info to be returned)
if opts.compress:
cifext = "cif.gz"
else:
cifext = "cif"
mdl_path = os.path.join(opts.out_dir, f"{mdl_id}.{cifext}")
# if os.path.exists(mdl_path):
# print(f" {mdl_id} already done...")
# return
# go for it...
print(f" translating {mdl_id}...")
pdb_start = timer()
# gather data into JSON-like structure
print(" preparing data...", end="")
pstart = timer()
# now we can fill all data
config_data = _get_config()
mdlcf_json = {}
mdlcf_json["audit_authors"] = _get_audit_authors()
mdlcf_json["protocol"] = _get_protocol_steps_and_software(config_data)
mdlcf_json["config_data"] = config_data
mdlcf_json["mdl_id"] = mdl_id
# process coordinates
pdb_file = os.path.join(opts.model_dir, f"{prot_name}.pdb")
target_entities, ost_ent = _get_entities(pdb_file, up_ac, prot_name)
mdlcf_json["target_entities"] = target_entities
# fill annotations
mdlcf_json["title"] = _get_title(prot_name)
mdlcf_json["model_details"] = _get_model_details(prot_name)
mdlcf_json["model_group_name"] = _get_model_group_name()
print(f" ({timer()-pstart:.2f}s)")
# save ModelCIF
_store_as_modelcif(mdlcf_json, ost_ent, opts.out_dir, mdl_id, opts.compress)
# check if result can be read and has expected seq.
ent = io.LoadMMCIF(mdl_path)
exp_seqs = [trg_ent["pdb_sequence"] \
for trg_ent in mdlcf_json["target_entities"]]
assert ent.chain_count == len(exp_seqs), f"Bad chain count {mdl_id}"
ent_seq = "".join(res.one_letter_code for res in ent.residues)
assert ent_seq == "".join(exp_seqs), f"Bad seq. {mdl_id}"
print(f" ... done with {mdl_id} ({timer()-pdb_start:.2f}s).")
def _main():
"""Run as script."""
opts = _parse_args()
# parse/fetch global data
metadata = _get_metadata(opts.metadata_file)
# get on with models
print(f"Working on {opts.metadata_file}...")
# iterate over models
for _, mrow in metadata.iterrows():
up_ac = mrow.Accession.strip()
prot_name = mrow.Name.strip()
_translate2modelcif(up_ac, prot_name, opts)
print(f"... done with {opts.metadata_file}.")
if __name__ == "__main__":
_main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment