Skip to content
Snippets Groups Projects
Commit a69eebed authored by Thomas Garello's avatar Thomas Garello
Browse files

added files for Juntao set

parent 74d8260e
No related branches found
No related tags found
No related merge requests found
# Modelling of histone complexes (structural prediction screen)
[Link to project in ModelArchive](https://www.modelarchive.org/doi/10.5452/ma-dm-hisrep) (incl. background on project itself)
Input files for conversion:
- Annotations.json with metadata (incl. UniProtKB AC)
- Config_Files directory with config_[X].json files and dates.json with model creation dates
- Zip_Files directory with files named [X]-[NAME].zip for each X listed in the metadata files
- ZIP files are expected to contain the 3 top ranked models (.pdb) with their respective scores (.json) and .png files for coverage-, pLDDT-, and PAE-plots as produced by ColabFold (all with their original file names)
Modelling setup:
- Classic ColabFold setup with PDB coordinates and scores JSON and link to UniProt
Special features here compared to the PRC-complexes script and PP2A-B55-design project:
- UniProt link: handling of subsets of sequences and with old versions and fixed cache of UniProt data, code adapted from PP2A-B55-design project
- Use of modelcif.reference.SeqDif to handle mismatches between UniProt and entity sequence, improved from PP2A-B55-design to use more of the alignment data (alignment start/end)
- Improved of PP2A-B55-design caching mechanism, with model sequence specific keys, when fetching UniProt data
- Processing of the model creation dates in dates.json, intentionally provided to determine ColabFold database names and versions, with the _get_cf_db_versions function
- Renaming of chain labels in the .pdb that have been shifted compared to the metadata ('A'->'B', 'B'->'C', etc.)
- Special processing for the model 263, to ignore the 5 first aa introduced in the model sequence during the sequence alignment algorithm
- local plddt accuracy threshold doubled to 0.011
- iptm score check ignored
Content:
- translate2modelcif.py : script to do conversion (run in virtual environment with same setup as Docker container here but with OST 2.8 and very latest main branch of python-modelcif and python-ihm from 20.6.2024)
- minimal_example.zip: example input to convert a single complex from this set
- minimal_example_modelcif: output from running conversion of minimal_example with the command bellow :
```python3 translate2modelcif.py ./ModelArchive-ma-dm-hisrep ./modelcif --single-model 3 --no-extra-files```
\ No newline at end of file
File added
File added
#! /usr/local/bin/ost
# -*- coding: utf-8 -*-
"""Translate PRC models for Juntao from PDB + extra data into ModelCIF."""
# EXAMPLES for running:
# ost translate2modelcif.py ./ModelArchive-ma-dm-hisrep ./modelcif
import argparse
import datetime
import gzip
import os
import shutil
import sys
import zipfile
from timeit import default_timer as timer
import numpy as np
import requests
import ujson as json
import ihm
import ihm.citations
import modelcif
import modelcif.associated
import modelcif.dumper
import modelcif.model
import modelcif.protocol
import modelcif.reference
import pandas as pd
from ost import io, seq, mol
################################################################################
# GENERAL HELPER FUNCTIONS
################################################################################
def _abort_msg(msg, exit_code=1):
"""Write error message and exit with exit_code."""
print(f"{msg}\nAborting.", file=sys.stderr)
sys.exit(exit_code)
def _warn_msg(msg):
"""Write a warning message to stdout."""
print(f"WARNING: {msg}")
def _check_file(file_path):
"""Make sure a file exists and is actually a file."""
if not os.path.exists(file_path):
_abort_msg(f"File not found: '{file_path}'.")
if not os.path.isfile(file_path):
_abort_msg(f"File path does not point to file: '{file_path}'.")
def _check_folder(dir_path):
"""Make sure a file exists and is actually a file."""
if not os.path.exists(dir_path):
_abort_msg(f"Path not found: '{dir_path}'.")
if not os.path.isdir(dir_path):
_abort_msg(f"Path does not point to a directory: '{dir_path}'.")
def _check_opts_folder(dir_path):
"""Remove trailing '/' (return fixed one) and check if path valid."""
if dir_path.endswith("/"):
dir_path = dir_path[:-1]
_check_folder(dir_path)
return dir_path
def _get_res_num(r, use_auth=False):
"""Get res. num. from auth. IDs if reading from mmCIF files."""
if use_auth:
return int(r.GetStringProp("pdb_auth_resnum"))
return r.number.num
def _get_ch_name(ch, use_auth=False):
"""Get chain name from auth. IDs if reading from mmCIF files."""
if use_auth:
return ch.GetStringProp("pdb_auth_chain_name")
return ch.name
def _get_sequence(chn, use_auth=False):
"""Get the sequence out of an OST chain incl. '-' for gaps in resnums."""
# initialise (add gaps if first is not at num. 1)
lst_rn = _get_res_num(chn.residues[0], use_auth)
idx = 1
sqe = "-" * (lst_rn - 1) + chn.residues[0].one_letter_code
for res in chn.residues[idx:]:
lst_rn += 1
while lst_rn != _get_res_num(res, use_auth):
sqe += "-"
lst_rn += 1
sqe += res.one_letter_code
return sqe
def shift_letter(letter):
"""Move letters backward in the alphabet. 'A' would raise an exception."""
# Convert letter to its ASCII value
ascii_value = ord(letter)
# Shift the ASCII value by -1
shifted_value = ascii_value - 1
# Handle 'A'
if shifted_value < ord('A'):
raise RuntimeError(
f"Could not shift the letter 'A' forward in the alphabet"
)
# Convert back to a character
return chr(shifted_value)
################################################################################
################################################################################
# DATA HANDLING
################################################################################
def _parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__,
)
parser.add_argument(
"input_data_path",
type=str,
metavar="<INPUT DATA PATH>",
help="Data as provided by depositors. Expected to contain files "
+ "Annotations.json with metadata, Config_Files "
+ "directory with config_[X].json files, dates.json"
+ "with model creation dates, and Zip_Files directory with files named "
+ "[X]-[NAME].zip for each X listed in the metadata files.",
)
parser.add_argument(
"out_dir",
type=str,
metavar="<OUTPUT DIR>",
help="Path to directory to store results ([X]-[NAME].* files and "
+ "issues.json with any observed issues).",
)
parser.add_argument(
"--compress",
default=False,
action="store_true",
help="Compress ModelCIF file with gzip.",
)
parser.add_argument(
"--checks-only",
default=False,
action="store_true",
help="Perform only checks without producing ModelCIF files.",
)
parser.add_argument(
"--no-extra-files",
default=False,
action="store_true",
help="Skip writing extra models, PNGs, and PAE (for testing).",
)
parser.add_argument(
"--single-model",
type=str,
#metavar="<PDB WEB PATH>",
default=None,
help="If provided, only the model matching the provided string in the "
+ "Annotations.json will be converted.",
)
opts = parser.parse_args()
# check input
opts.input_data_path = _check_opts_folder(opts.input_data_path)
_check_file(os.path.join(opts.input_data_path, "Annotations", "Annotations.json"))
_check_folder(os.path.join(opts.input_data_path, "Configs"))
_check_folder(os.path.join(opts.input_data_path, "Zip_files"))
if opts.out_dir.endswith("/"):
opts.out_dir = opts.out_dir[:-1]
if not os.path.exists(opts.out_dir):
os.makedirs(opts.out_dir, exist_ok=True)
return opts
def _get_audit_authors():
"""Return the list of authors that produced this model."""
return (
"Yu, Juntao",
"Zhang, Yujie",
"Fang, Yimeng",
"Paulo, Joao A.",
"Yaghoubi, Dadmehr",
"Hua, Xu",
"Shipkovenska, Gergana",
"Toda, Takenori",
"Zhang, Zhiguo",
"Gygi, Steven P.",
"Jia, Songtao",
"Li, Qing",
"Moazed, Danesh",
)
def _zip_file_check(zf, file_name, desired_ranks, data_from_zip, exp_relaxed):
"""Fill data_from_zip with info from given file in zipped file handle zf if
data is to be added (depends on file_name).
Only ranks from 1 to desired_ranks are parsed.
Return False if file not parsed (ok if file_name hidden file; weird else).
"""
file_base_path = os.path.basename(file_name)
file_base, file_ext = os.path.splitext(file_base_path)
if file_base.startswith('.'):
return False
# check for PNGs
for png_type in ["coverage", "plddt", "pae"]: #TODO: should not allow overwrite
if file_name.lower().endswith(f"_{png_type}.png") and ("{png_type}_png_data" not in data_from_zip.keys()):
data_from_zip[f"{png_type}_png_data"] = zf.open(file_name).read()
data_from_zip[f"{png_type}_png_file_name"] = file_base_path
return True
# special case: config.json
if file_name == "config.json":
data_from_zip["config"] = json.load(zf.open(file_name))
return True
# check for PDB and JSON of given rank
# -> PDB file name example: ..._unrelaxed_rank_001_..._model_1_seed_000.pdb
# -> JSON file name example: ..._scores_rank_001_..._model_1_seed_000.json
if file_ext in [".pdb", ".json"]:
ss = file_base.split('_')
mdl_rank = int(ss[ss.index("rank") + 1])
if mdl_rank < 1 or mdl_rank > desired_ranks:
return False
mdl_rank_key = f"rank_{mdl_rank}"
if mdl_rank_key not in data_from_zip:
data_from_zip[mdl_rank_key] = {}
mdl_dict = data_from_zip[mdl_rank_key]
if file_ext == ".pdb":
# duplicate = extra unhandled file
if "ent" in mdl_dict or (exp_relaxed and "_unrelaxed_" in file_base):
return False
mdl_dict["mdl_file_base"] = file_base
mdl_dict["mdl_id"] = '_'.join(ss[ss.index("model"):])
mdl_dict["ent"] = io.PDBStrToEntity(
zf.open(file_name).read(),
profile=io.profiles["DEFAULT"],
process=True
)
return True
elif file_ext == ".json":
# duplicate = extra unhandled file
if "scores" in mdl_dict:
return False
mdl_dict["scores"] = json.load(zf.open(file_name))
return True
return False
def _parse_zip_file(zip_file_path, desired_ranks, incl_relaxed=False):
"""Parse data in provided ZIP files and checks for desired ranks.
Returns dict. with keys "rank_X" for X from 1 to desired_ranks incl.:
- mdl_file_base: file name of selected PDB file w/o directory and extension
- mdl_id: ID of selected PDB file (e.g. "model_1_seed_000")
- ent: OST entity for PDB file
- scores: dict. loaded from scores JSON
It further contains info on PNG files as:
- [png_type]_png_data: data in file (to be written back into accomp. zip)
- [png_type]_png_file_name: file name w/o directory used in ZIP file
for [png_type] in [coverage, plddt, pae].
Optionally available keys (if available in zip file):
- config: dict. loaded from config.json
If expected files are not found, an exception is raised.
If additional files are found, a warning is shown.
"""
file_dates = []
data_from_zip = {}
unparsed_files = []
with zipfile.ZipFile(zip_file_path) as zf:
# handle case where it's just a zip file in a zip file
nested_zip_files = [f for f in zf.namelist() if f.endswith(".zip")]
if len(zf.namelist()) == 1 and len(nested_zip_files) == 1:
with zf.open(nested_zip_files[0]) as nested_zip:
return _parse_zip_file(nested_zip, desired_ranks, incl_relaxed)
# handle all other cases
for file_name in zf.namelist():
check = _zip_file_check(
zf, file_name, desired_ranks, data_from_zip, incl_relaxed
)
# ok/expected check false for unrelaxed in cases with relaxed ones
if not check and not os.path.basename(file_name).startswith('.') \
and not (incl_relaxed and "_unrelaxed_" in file_name):
unparsed_files.append(file_name)
else:
file_date = zf.getinfo(file_name).date_time
file_dates.append(datetime.datetime(*file_date))
# check if complete
exp_keys = [f"rank_{num+1}" for num in range(desired_ranks)]
for png_type in ["coverage", "plddt", "pae"]:
exp_keys.extend([
f"{png_type}_png_data", f"{png_type}_png_file_name"
])
unmatched_keys = set(exp_keys) - set(data_from_zip)
if len(unmatched_keys) != 0:
_warn_msg(
f"Could not find expected files in {zip_file_path}. " \
f"Missing {sorted(unmatched_keys)}."
)
exp_mdl_keys = sorted(["mdl_file_base", "mdl_id", "ent", "scores"])
for num in range(desired_ranks):
if exp_mdl_keys != sorted(data_from_zip[f"rank_{num+1}"].keys()):
raise RuntimeError(
f"Could not find expected files in {zip_file_path} " \
f"for rank {num + 1}."
)
extra_stuff = sorted(set(data_from_zip) - set(exp_keys + ["config"]))
extra_stuff += sorted(unparsed_files)
if len(extra_stuff) != 0:
_warn_msg(
f"Extra unexpected content found in {zip_file_path}: " \
f"{extra_stuff}"
)
# add date range
data_from_zip["date_range"] = (min(file_dates), max(file_dates))
return data_from_zip
def _check_scores(mdl_data_from_zip, metadata, mdl_rank):
"""Check scores JSON.
Bad issues raise exceptions, minor ones are in returned list
(compatible with list returned by _get_entities)
"""
issues = []
scores_json = mdl_data_from_zip["scores"]
# NOTE: cannot deal with gapped sequences here as we cannot map
# multiple chains to scores
ost_ent = mdl_data_from_zip["ent"]
exp_len = ost_ent.residue_count
assert "ptm" in scores_json
assert len(scores_json["pae"]) == exp_len
assert len(scores_json["pae"][0]) == exp_len
# check actual scores
# b-factor vs pLDDT in expected range?
ent_plddts = []
for i, res in enumerate(ost_ent.residues):
b_factors = [a.b_factor for a in res.atoms]
assert len(set(b_factors)) == 1 # must all be equal!
ent_plddts.append(b_factors[0])
scores_plddts = scores_json["plddt"]
assert len(ent_plddts) == len(scores_plddts)
plddt_max_diff = max([
abs(s1 - s2) for s1, s2 in zip(ent_plddts, scores_plddts)
])
# threshold due to 0.01 accuracy in PDB file + numerical rounding
if plddt_max_diff > 0.011:
issues.append((
metadata['mdl_id'],
"plddt_vs_bf_mismatch",
(mdl_rank, plddt_max_diff),
()
))
return issues
def _get_n_parse_up_entry(up_ac, up_txt_path):
"""Get data for an UniProtKB entry and parse it."""
# This is a simple parser for UniProtKB txt format, instead of breaking it
# up into multiple functions, we just allow many many branches & statements,
# here.
# pylint: disable=too-many-branches,too-many-statements
data = {}
data["up_organism"] = ""
data["up_sequence"] = ""
data["up_ac"] = up_ac
# check if we read from file or URL
if up_txt_path.startswith("http"):
rspns = requests.get(up_txt_path, timeout=180)
lines = rspns.iter_lines(decode_unicode=True)
else:
lines = open(up_txt_path).readlines()
for line_ in lines:
# need to strip trailing characters if reading from file (doesn't hurt)
line = line_.rstrip()
if line.startswith("ID "):
sline = line.split()
if len(sline) != 5:
raise RuntimeError(f"Unusual UniProtKB ID line found:\n" \
f"'{line}'")
data["up_id"] = sline[1]
elif line.startswith("OX NCBI_TaxID="):
# Following strictly the UniProtKB format: 'OX NCBI_TaxID=<ID>;'
data["up_ncbi_taxid"] = line[len("OX NCBI_TaxID=") : -1]
data["up_ncbi_taxid"] = data["up_ncbi_taxid"].split("{")[0].strip()
elif line.startswith("OS "):
# multiple lines possible; last one ends in "."
if line[-1] == ".":
data["up_organism"] += line[len("OS ") : -1]
else:
data["up_organism"] += line[len("OS ") :] + " "
elif line.startswith("SQ "):
sline = line.split()
if len(sline) != 8:
raise RuntimeError(f"Unusual UniProtKB SQ line found:\n" \
f"'{line}'")
data["up_seqlen"] = int(sline[2])
data["up_crc64"] = sline[6]
elif line.startswith(" "):
sline = line.split()
if len(sline) > 6:
raise RuntimeError(
"Unusual UniProtKB sequence data line "
+ f"found:\n'{line}'"
)
data["up_sequence"] += "".join(sline)
elif line.startswith("DT "):
dt_flds = line[len("DT ") :].split(", ")
if dt_flds[1].upper().startswith("SEQUENCE VERSION "):
data["up_last_mod"] = datetime.datetime.strptime(
dt_flds[0], "%d-%b-%Y"
)
elif dt_flds[1].upper().startswith("ENTRY VERSION "):
data["up_entry_version"] = dt_flds[1][len("ENTRY VERSION ") :]
if data["up_entry_version"][-1] == ".":
data["up_entry_version"] = data["up_entry_version"][:-1]
data["up_entry_version"] = int(data["up_entry_version"])
elif line.startswith("GN Name="):
data["up_gn"] = line[len("GN Name=") :].split(";")[0]
data["up_gn"] = data["up_gn"].split("{")[0].strip()
# in UP isoforms are identified in the AC so no need for this...
# -> in PDB (e.g. 8TRE), we see unset _struct_ref.pdbx_db_isoform in such cases
data["up_isoform"] = None
# NOTE: no gene names in this set (use provided names instead)
if "up_gn" not in data:
_warn_msg(
f"No gene name found for UniProtKB entry '{up_ac}', using "
+ "UniProtKB AC instead."
)
data["up_gn"] = up_ac
if "up_last_mod" not in data:
raise RuntimeError(f"No sequence version found for UniProtKB entry " \
f"'{up_ac}'.")
if "up_crc64" not in data:
raise RuntimeError(f"No CRC64 value found for UniProtKB entry " \
f"'{up_ac}'.")
if len(data["up_sequence"]) == 0:
raise RuntimeError(f"No sequence found for UniProtKB entry '{up_ac}'.")
# check that sequence length and CRC64 is correct
if data["up_seqlen"] != len(data["up_sequence"]):
raise RuntimeError(
"Sequence length of SQ line and sequence data differ for "
+ f"UniProtKB entry '{up_ac}': {data['up_seqlen']} != "
+ f"{len(data['up_sequence'])}"
)
if "up_id" not in data:
raise RuntimeError(f"No ID found for UniProtKB entry '{up_ac}'.")
if "up_ncbi_taxid" not in data:
raise RuntimeError(f"No NCBI taxonomy ID found for UniProtKB entry " \
f"'{up_ac}'.")
if len(data["up_organism"]) == 0:
raise RuntimeError(f"No organism species found for UniProtKB entry " \
f"'{up_ac}'.")
return data
def _fetch_upkb_entry(up_ac):
"""Get an UniProtKB entry."""
return _get_n_parse_up_entry(
up_ac, f"https://rest.uniprot.org/uniprotkb/{up_ac}.txt"
)
def _fetch_unisave_entry(up_ac, version):
"""Get an UniSave entry, in contrast to an UniProtKB entry, that allows us
to specify a version."""
return _get_n_parse_up_entry(
up_ac,
f"https://rest.uniprot.org/unisave/{up_ac}?format=txt&"
+ f"versions={version}",
)
# for cache below
upkb_entry_cache = {} # key = (up_ac, up_version, mdl_sequence)
def _fetch_upkb_cached(sqe, up_ac, up_version=None):
"""Get best matching UniProtKB entry for sequence sqe.
Get it from cache if already fetched.
up_version defines starting version in UP to check.
Note that the returned UP entry may be a different version than the one in up_version.
Returned UP data includes result of _align_sequences.
If no perfect match found, it prints a warning.
"""
# check if in cache already
cache_key = (up_ac, up_version, sqe)
if cache_key in upkb_entry_cache:
return upkb_entry_cache[cache_key]
# fetch and parse first guess
if up_version is None:
up_data = _fetch_upkb_entry(up_ac)
else:
up_data = _fetch_unisave_entry(up_ac, up_version)
# find best match starting from first guess
min_up_data = None
while True:
mismatches, up_range, mdl_range, covered_aln, mdl_seqres = _align_sequences(
sqe, up_data["up_sequence"], atomseq_aln=False)
if min_up_data is None or \
len(mismatches) < len(min_up_data["mismatches"]):
min_up_data = up_data
min_up_data["mismatches"] = mismatches
min_up_data["up_range"] = up_range
min_up_data["mdl_range"] = mdl_range
min_up_data["covered_aln"] = covered_aln
min_up_data["mdl_seqres"] = mdl_seqres
if len(mismatches) == 0:
# found hit; done
break
# fetch next one (skip if exceptions happen)
next_v = up_data["up_entry_version"] - 1
while next_v > 0:
try:
# note: can fail to parse very old UP versions...
up_data = _fetch_unisave_entry(up_ac, next_v)
# can move on if no exception happened
break
except RuntimeError as ex:
# _warn_msg(f"Error in parsing v{next_v} of {up_ac}:\n{ex}")
# try next one
next_v -= 1
if next_v == 0:
# warn user about failure to find match and abort
msg = f"Sequences not equal from file: {sqe}, from UniProtKB: " \
f"{min_up_data['up_sequence']} ({up_ac}), checked entire " \
f"entry history and best match had following mismatches " \
f"in v{min_up_data['up_entry_version']} (range " \
f"{min_up_data['up_range']}): {min_up_data['mismatches']}."
_warn_msg(msg)
upkb_entry_cache[cache_key] = min_up_data
return min_up_data
# keep in cache
upkb_entry_cache[cache_key] = up_data
return min_up_data
def _align_sequences(mdl_sqe, ref_sqe, atomseq_aln=True, ref_fixes=[],
gapped_aa="XOUBJZ"):
"""Compare sequence while paying attention on non-standard amino acids.
Can pass list of tuples for OLCs expected to be changed between ref and mdl.
E.g. Jason set would have ref_fixes=[('B', 'D'), ('J', 'L'), ('Z', 'E')].
Non-def. AA (listed in gapped_aa) in ref_sqe are assumed to be gaps (-) in
mdl_sqe (this is def. in CF/AF for "XOUBJZ").
Returns (mismatches, ref_range, mdl_range, covered_aln, mdl_seqres):
- mismatches = list of (ref_pos, mdl_pos, olc_ref, olc_mdl)
(positions are 1-indexed, None if gap and only if in range)
- ref_range / mdl_range = (start, end) tuples with 1-indexed positions of
start and end of covered range (mdl_range with respect to mdl_sqe!).
Extra non-covered residues in mdl or ref can be counted by comparing
ref_range / mdl_range with lengths of ref_sqe / mdl_sqe.
- covered_aln = alignment (seq. 0 = REF, seq. 1 = MDL) within covered range
(i.e. first and last column have no gaps). If atomseq_aln is True, the
alignment only includes non-gap residues of mdl_sqe. OST seq. offsets are
set with respect to mdl_sqe, ref_sqe (0-indexed). Note that offsets are
only guaranteed to fit ref_range / mdl_range if atomseq_aln is False.
- mdl_seqres = mdl_sqe with gaps (-) replaced with seq. from ref. if
non-def-AA there or with X otherwise (i.e. both have same length!).
Here guaranteed for mdl_seqres to match mdl_sqe if AA in gapped_aa and X
are replaced by gaps (-).
"""
# add fixes if needed
ref_sqe_fixed = ref_sqe
for olc1, olc2 in ref_fixes:
ref_sqe_fixed = ref_sqe_fixed.replace(olc1, olc2)
# put gaps for parts not modelled by AF2 (i.e. any non-def-AA)
ref_sqe_fixed_gapped = ref_sqe_fixed
for olc in gapped_aa:
assert olc not in mdl_sqe
ref_sqe_fixed_gapped = ref_sqe_fixed_gapped.replace(olc, '-')
# easy and preferred case: mdl_sqe is subset of ref_sqe
ref_idx = ref_sqe_fixed_gapped.find(mdl_sqe)
if ref_idx >= 0:
mismatches = []
ref_range = (ref_idx + 1, ref_idx + len(mdl_sqe))
mdl_range = (1, len(mdl_sqe))
mdl_seqres = ref_sqe_fixed[ref_idx : ref_idx + len(mdl_sqe)]
# we handle covered_aln afterwards...
aln_s_ref = ref_sqe[ref_idx : ref_idx + len(mdl_sqe)]
aln_s_mdl = mdl_seqres
else:
# align and report mismatches
ref_seq = seq.CreateSequence("REF", ref_sqe_fixed)
# use X as first guess for gaps in model
mdl_seq = seq.CreateSequence("MDL", mdl_sqe.replace('-', 'x'))
aln = seq.alg.SemiGlobalAlign(ref_seq, mdl_seq, seq.alg.BLOSUM62)[0]
# get range
aligned_indices = [i for i, c in enumerate(aln) \
if c[0] != '-' and c[1] != '-']
ref_range = (
aln.GetResidueIndex(0, aligned_indices[0]) + 1,
aln.GetResidueIndex(0, aligned_indices[-1]) + 1,
)
mdl_range = (
aln.GetResidueIndex(1, aligned_indices[0]) + 1,
aln.GetResidueIndex(1, aligned_indices[-1]) + 1,
)
# build up strings as we go
aln_s_ref = ""
aln_s_mdl = ""
mdl_seqres = ""
# collect mismatches and fix seqs as we go
mismatches = []
for idx, (olc_ref, olc_mdl) in enumerate(aln):
# fix seqres as needed
if olc_mdl == 'x' and olc_ref in gapped_aa:
olc_mdl = olc_ref
if olc_mdl != '-':
mdl_seqres += olc_mdl
if idx >= aligned_indices[0] and idx <= aligned_indices[-1]:
# fill aln_s_x as needed
if olc_ref != '-':
# must fetch from ref_sqe
ref_idx = aln.GetResidueIndex(0, idx)
aln_s_ref += ref_sqe[ref_idx]
ref_pos = ref_idx + 1
else:
aln_s_ref += '-'
ref_pos = None
if olc_mdl != '-':
# fetch from mdl_seqres here
# (revert back to mdl_sqe afterwards)
mdl_idx = aln.GetResidueIndex(1, idx)
aln_s_mdl += mdl_seqres[mdl_idx]
mdl_pos = mdl_idx + 1
else:
aln_s_mdl += '-'
mdl_pos = None
if olc_ref != olc_mdl:
mismatches.append((ref_pos, mdl_pos, olc_ref, olc_mdl))
# fix remaining x in mdl_seqres
mdl_seqres = mdl_seqres.replace('x', 'X')
# create covered_aln
s_ref_offset = ref_range[0] - 1
s_mdl_offset = mdl_range[0] - 1
covered_aln = seq.CreateAlignment(
seq.CreateSequence("REF", aln_s_ref),
seq.CreateSequence("MDL", aln_s_mdl.replace('x', 'X'))
)
# cut it once again if needed (only for atomseq_aln)
if atomseq_aln:
# revert
new_cols = [
(
olc_ref,
'-' if olc_mdl == 'x' or olc_mdl in gapped_aa else olc_mdl
) for olc_ref, olc_mdl in zip(aln_s_ref, aln_s_mdl)
]
aligned_indices = [i for i, c in enumerate(new_cols) \
if c[0] != '-' and c[1] != '-']
s_ref_offset += covered_aln.GetResidueIndex(0, aligned_indices[0])
s_mdl_offset += covered_aln.GetResidueIndex(1, aligned_indices[0])
cut_cols = new_cols[aligned_indices[0]:aligned_indices[-1]+1]
aln_s_ref = "".join([olc_ref for olc_ref, _ in cut_cols])
aln_s_mdl = "".join([olc_mdl for _, olc_mdl in cut_cols])
covered_aln = seq.CreateAlignment(
seq.CreateSequence("REF", aln_s_ref),
seq.CreateSequence("MDL", aln_s_mdl)
)
covered_aln.SetSequenceOffset(0, s_ref_offset)
covered_aln.SetSequenceOffset(1, s_mdl_offset)
# check post assertions (as in docstring)
assert ref_sqe[covered_aln.GetSequenceOffset(0):]\
.startswith(covered_aln.sequences[0].gapless_string)
if atomseq_aln:
assert mdl_sqe[covered_aln.GetSequenceOffset(1)] \
== covered_aln.sequences[1].gapless_string[0]
assert mdl_sqe[covered_aln.GetSequenceOffset(1):].replace('-', '')\
.startswith(covered_aln.sequences[1].gapless_string)
else:
assert covered_aln.sequences[0].gapless_string \
== ref_sqe[ref_range[0]-1:ref_range[1]]
assert covered_aln.sequences[1].gapless_string \
== mdl_seqres[mdl_range[0]-1:mdl_range[1]]
assert mdl_seqres[covered_aln.GetSequenceOffset(1):]\
.startswith(covered_aln.sequences[1].gapless_string)
assert len(mdl_seqres) == len(mdl_sqe)
mdl_sqe_check = mdl_seqres.replace('X', '-')
for olc in gapped_aa:
mdl_sqe_check = mdl_sqe_check.replace(olc, '-')
assert mdl_sqe_check == mdl_sqe.replace('X', '-')
#
return mismatches, ref_range, mdl_range, covered_aln, mdl_seqres
def _get_entities(mdl_data_from_zip, metadata):
"""Gather data for the mmCIF (target) entities.
Returns (list of cif_ents, list of issues)
"""
issues = []
# merge info for matching chains
unique_chains = {} # key = sqe_gaps, value = partial cif_ent
chain_info = {ch["chain"]: {
"name": ch["name"], "up_ac": ch["up_ac"], "up_range": ch["up_range"]
} for ch in metadata["chains"]}
ost_ent = mdl_data_from_zip["ent"]
#make sure the chain labels start with A and are consistent with the metadata
if _get_ch_name(ost_ent.chains[0], False) != 'A':
issues.append((
metadata['mdl_id'],
"shifted_chain_label",
(),
()
))
editor = ost_ent.EditXCS(mol.EditMode.BUFFERED_EDIT)
for chn in ost_ent.chains:
editor.RenameChain(chn, shift_letter(_get_ch_name(chn, False)))
editor.UpdateICS()
for chn in ost_ent.chains:
pdb_chain_id = _get_ch_name(chn, False)
if pdb_chain_id not in chain_info:
raise RuntimeError(
f"Non-described chain {pdb_chain_id} in " \
f"{metadata['mdl_id']}/{mdl_data_from_zip['mdl_file_base']}"
)
sqe_gaps = _get_sequence(chn)
cif_ent = {
"pdb_sequence": sqe_gaps,
"pdb_chain_ids": [_get_ch_name(chn, False)],
"description": chain_info[pdb_chain_id]["name"],
"up_ac": chain_info[pdb_chain_id]["up_ac"],
# expected up range as parsed in metadata
"exp_up_range": chain_info[pdb_chain_id]["up_range"],
}
if sqe_gaps in unique_chains:
other_cif_ent = unique_chains[sqe_gaps]
# sanity checks
for key, value in other_cif_ent.items():
if key != "pdb_chain_ids" and value != cif_ent[key]:
raise RuntimeError(
f"Inconsistent info {key} for identical chains for " \
f"chain {pdb_chain_id} vs chains " \
f"{other_cif_ent['pdb_chain_ids']}."
)
# add to list of chains
other_cif_ent['pdb_chain_ids'].append(pdb_chain_id)
else:
unique_chains[sqe_gaps] = cif_ent
# sort by model chain name (should ensure same order of chains in mmCIF)
entities = sorted(
unique_chains.values(),
key=lambda x: min(x["pdb_chain_ids"])
)
# compare with info from UP and complete data to return
for cif_ent in entities:
sqe_gaps = cif_ent["pdb_sequence"]
up_ac = cif_ent["up_ac"]
if metadata['mdl_id'] == "263-Spombe-Mrc1_like_domain-Mcm2_NTD-Cdc45-H31-H4_tetramer" and cif_ent["pdb_chain_ids"][0] == 'C':
#special case with 5 pdb_extra aa in one chain in one model, the allignment is forced to start at position 6
position_shift = 5
up_data_original = _fetch_upkb_cached(sqe_gaps[position_shift:], up_ac)
up_data = up_data_original.copy()
if up_data['mdl_range'][0] == 1: up_data['mdl_range'] = tuple(range + position_shift for range in up_data['mdl_range'])
up_data_full = _fetch_upkb_cached(sqe_gaps, up_ac)
up_data['mismatches']= [tuple([854, 209, 'D', 'E'])]
up_data['mdl_seqres'] = up_data_full['mdl_seqres']
_warn_msg("mdl_263 handeled with special harcoded alignment for chain 'C'")
else:
up_data = _fetch_upkb_cached(sqe_gaps, up_ac)
num_extra_ref = len(up_data["up_sequence"]) - (up_data["up_range"][1] - up_data["up_range"][0] + 1)
len_mdl_covered = (up_data["mdl_range"][1] - up_data["mdl_range"][0] + 1)
num_extra_mdl = len(sqe_gaps) - len_mdl_covered
if len(up_data["mismatches"]) > 0 or num_extra_ref > 0 or num_extra_mdl > 0:
# ok to cover subset of UP usually (e.g. Ubiquitin), rest big issue
if len(up_data["mismatches"]) > 0 or num_extra_mdl > 0:
issue_type = "up_mismatch"
else:
issue_type = "up_extra"
if cif_ent['exp_up_range'] == None:
cif_up_range = (1, len(up_data["up_sequence"]))
else:
cif_up_range = tuple(
map(int, cif_ent['exp_up_range'].split('-')))
if (issue_type == "up_extra" and up_data["up_range"] != cif_up_range) or issue_type == "up_mismatch":
chain_names = ",".join(cif_ent["pdb_chain_ids"])
short_data = (
mdl_data_from_zip['mdl_file_base'], chain_names, up_ac,
len_mdl_covered, len(up_data["mismatches"]), num_extra_ref, num_extra_mdl
)
long_data = (up_data["mismatches"], up_data["up_range"], up_data["mdl_range"])
issues.append(
(metadata['mdl_id'], issue_type, short_data, long_data)
)
# cannot deal with gapped sequences here as we cannot map to scores
if sqe_gaps != up_data["mdl_seqres"]:
issues.append((
metadata['mdl_id'],
"gapped_seq",
(cif_ent['pdb_chain_ids']),
(sqe_gaps, up_data["mdl_seqres"])
))
cif_ent["seqres"] = up_data["mdl_seqres"]
cif_ent.update(up_data)
return entities, issues
def _get_cf_config(cf_config, ur30_db_version=None, tpl_db=None,
tpl_db_version=None):
"""Define ColabFold setup.
Extra info needed from depositor for DBs used (depend on MMseqs2 server)
- ur30_db_version options: see dict in _get_sequence_dbs
- tpl_db options: None, "PDB70", "PDB100"
- tpl_db_version options: see dict in _get_sequence_dbs
-> can be set to None if DB not used at all (incl. custom tpls)
Note on versions used over time
- first: 2021_03 version of UniRef30, unclear what PDB70
- after 13.7.22: updated the UniRef30 to 2022_02 and PDB70 to 220313
- after 12.6.23: UniRef30 2023_02, PDB100 (instead of PDB70) 230517
- also to define if DB used at all for tpls or custom tpls
- db versions only relevant if actually used
"""
# NOTES:
# - UP-TO-DATE (as of March 2024) generic parser given a config.json dict
# - custom MSA is assumed to be complemented with extra step (as for Jason)
# keep version indep. of params (and add commit since versions are meh)
cf_version = cf_config["version"]
if "commit" in cf_config and cf_config["commit"] is not None:
cf_version += f" ({cf_config['commit'][:7]})"
# drop fields which are not relevant for model building
cf_config = cf_config.copy()
for key in ["num_queries", "commit", "version", "user_agent"]:
if key in cf_config:
del cf_config[key]
# NOTE: following code from
# https://github.com/sokrypton/ColabFold/blob/main/colabfold/batch.py to
# understand config
# -> should be backward compatible with Tara and Niko sets
# -> see also https://github.com/sokrypton/ColabFold/wiki/v1.5.0
# deal with old names (some settings changed name in v1.5)
# -> code taken almost verbatim from https://github.com/sokrypton/ColabFold
old_names = {"MMseqs2 (UniRef+Environmental)": "mmseqs2_uniref_env",
"MMseqs2 (UniRef only)": "mmseqs2_uniref",
"unpaired+paired": "unpaired_paired",
"AlphaFold2-multimer-v1": "alphafold2_multimer_v1",
"AlphaFold2-multimer-v2": "alphafold2_multimer_v2",
"AlphaFold2-multimer-v3": "alphafold2_multimer_v3",
"AlphaFold2-ptm": "alphafold2_ptm",
"AlphaFold2": "alphafold2"}
msa_mode = old_names.get(cf_config["msa_mode"], cf_config["msa_mode"])
if "pair_mode" in cf_config:
pair_mode = old_names.get(cf_config["pair_mode"], cf_config["pair_mode"])
model_type = old_names.get(cf_config["model_type"], cf_config["model_type"])
# fix v1.5 defaults for num_recycles and recycle_early_stop_tolerance
# -> def. (set as "null" in config):
# - num_recycles == 20 if alphafold2_multimer_v3 else 3
# - recycle_early_stop_tolerance == 0.5 if multimer else 0.0
# -> valid from 1.5.0 until 1.5.5 (and probably later)
# -> defined in alphafold/model/config.py of steineggerlab/alphafold repo
if "num_recycles" in cf_config and cf_config["num_recycles"] is None:
if "multimer" in model_type and model_type not in [
"alphafold2_multimer_v1", "alphafold2_multimer_v2"
]:
cf_config["num_recycles"] = 20
else:
cf_config["num_recycles"] = 3
if "recycle_early_stop_tolerance" in cf_config \
and cf_config["recycle_early_stop_tolerance"] is None:
cf_config["recycle_early_stop_tolerance"] = \
0.5 if "multimer" in model_type else 0.0
# remove null config entries (ASSUME: None = use default)
cf_config = {k: v for k, v in cf_config.items() if v is not None}
# fetch relevant data
# -> MSA mode
if msa_mode == "mmseqs2_uniref_env":
seq_dbs = ["UniRef", "Environmental"]
use_mmseqs = True
use_msa = True
elif msa_mode == "mmseqs2_uniref":
seq_dbs = ["UniRef"]
use_mmseqs = True
use_msa = True
elif msa_mode == "single_sequence":
seq_dbs = []
use_mmseqs = False
use_msa = False
elif msa_mode == "custom":
seq_dbs = []
use_mmseqs = False
use_msa = True
else:
raise ValueError(f"Unknown msa_mode {cf_config['msa_mode']}")
# -> model type
if model_type == "alphafold2_multimer_v1":
# AF-Multimer as introduced in AlphaFold v2.1.0
use_multimer = True
multimer_version = 1
elif model_type == "alphafold2_multimer_v2":
# AF-Multimer as introduced in AlphaFold v2.2.0
use_multimer = True
multimer_version = 2
elif model_type == "alphafold2_multimer_v3":
# AF-Multimer as introduced in AlphaFold v2.3.0
use_multimer = True
multimer_version = 3
elif model_type == "alphafold2_ptm":
use_multimer = False
multimer_version = None
else:
raise ValueError(f"Unknown model_type {cf_config['model_type']}")
# write modeling description
mdl_description = f"Model generated using ColabFold v{cf_version}"
if use_multimer:
mdl_description += f" with AlphaFold-Multimer (v{multimer_version})"
else:
mdl_description += " with AlphaFold"
# early stopping feature of ColabFold
upto_mdl = ""
upto_rec = ""
if cf_config.get("stop_at_score", 100) < 100:
upto_mdl = "up to "
upto_rec = "up to "
if cf_config.get("recycle_early_stop_tolerance", 0) > 0:
upto_rec = "up to "
if cf_config.get("num_seeds", 1) > 1:
mdl_str = f"{cf_config['num_models'] * cf_config['num_seeds']} " \
f"models ({cf_config['num_seeds']} random seeds per " \
f"parameter set)"
else:
mdl_str = f"{cf_config['num_models']} models"
mdl_description += f" producing {upto_mdl}{mdl_str} with {upto_rec}" \
f"{cf_config['num_recycles']} recycles each"
if cf_config.get("use_amber", False) or \
cf_config.get("num_relax", 0) > 0:
mdl_description += ", with AMBER relaxation"
else:
mdl_description += ", without model relaxation"
if cf_config["use_templates"]:
# tpl_db == None meant to mean that custom templates were used
# -> no need to stress it but just visible in search DBs
mdl_description += ", using templates"
else:
mdl_description += ", without templates"
tpl_db = None
tpl_db_version = None
if cf_config["rank_by"] == "plddt":
mdl_description += ", ranked by pLDDT"
elif cf_config["rank_by"] == "ptmscore":
mdl_description += ", ranked by pTM"
elif cf_config["rank_by"] == "multimer":
mdl_description += ", ranked by 80*ipTM+20*pTM"
else:
raise ValueError(f"Unknown rank_by {cf_config['rank_by']}")
if use_msa:
mdl_description += ", starting from"
if use_mmseqs:
msa_type = "MSA"
else:
msa_type = "custom MSA"
if use_multimer:
if pair_mode == "unpaired_paired":
mdl_description += f" paired and unpaired {msa_type}s"
elif pair_mode == "paired":
mdl_description += f" paired {msa_type}s"
elif pair_mode == "unpaired":
mdl_description += f" unpaired {msa_type}s"
else:
raise ValueError(f"Unknown pair_mode {cf_config['pair_mode']}")
elif msa_type.startswith('M'):
mdl_description += f" an {msa_type}"
else:
mdl_description += f" a {msa_type}"
if use_mmseqs:
mdl_description += f" from MMseqs2 ({'+'.join(seq_dbs)})"
else:
mdl_description += " without an MSA"
mdl_description += "."
return {
"params": cf_config,
"version": cf_version,
"seq_dbs": seq_dbs,
"use_mmseqs": use_mmseqs,
"use_msa": use_msa,
"ur30_db_version": ur30_db_version,
"tpl_db": tpl_db,
"tpl_db_version": tpl_db_version,
"use_multimer": use_multimer,
"multimer_version": multimer_version,
"description": mdl_description,
}
def _get_mmseqs2_software(version=None):
"""Get MMseqs2 as a dictionary, suitable to create a modelcif software
object."""
return {
"name": "MMseqs2",
"classification": "data collection",
"description": "Many-against-Many sequence searching",
"citation": ihm.citations.mmseqs2,
"location": "https://github.com/soedinglab/mmseqs2",
"type": "package",
"version": version,
}
def _get_colabfold_software(version=None):
"""Get ColabFold as a dictionary, suitable to create a modelcif software
object."""
return {
"name": "ColabFold",
"classification": "model building",
"description": "Structure prediction",
"citation": ihm.citations.colabfold,
"location": "https://github.com/sokrypton/ColabFold",
"type": "package",
"version": version,
}
def _get_af2_software(version=None, is_multimer=False):
"""Get AF2 as dictionary, suitable to create a modelcif software object."""
if is_multimer:
return {
"name": "AlphaFold-Multimer",
"classification": "model building",
"description": "Structure prediction",
"citation": ihm.Citation(
pmid=None,
title="Protein complex prediction with "
+ "AlphaFold-Multimer.",
journal="bioRxiv",
volume=None,
page_range=None,
year=2021,
authors=[
"Evans, R.",
"O'Neill, M.",
"Pritzel, A.",
"Antropova, N.",
"Senior, A.",
"Green, T.",
"Zidek, A.",
"Bates, R.",
"Blackwell, S.",
"Yim, J.",
"Ronneberger, O.",
"Bodenstein, S.",
"Zielinski, M.",
"Bridgland, A.",
"Potapenko, A.",
"Cowie, A.",
"Tunyasuvunakool, K.",
"Jain, R.",
"Clancy, E.",
"Kohli, P.",
"Jumper, J.",
"Hassabis, D.",
],
doi="10.1101/2021.10.04.463034",
),
"location": "https://github.com/deepmind/alphafold",
"type": "package",
"version": version,
}
else:
return {
"name": "AlphaFold",
"classification": "model building",
"description": "Structure prediction",
"citation": ihm.citations.alphafold2,
"location": "https://github.com/deepmind/alphafold",
"type": "package",
"version": version,
}
def _get_protocol_steps_and_software(cf_config):
"""Create the list of protocol steps with software and parameters used."""
protocol = []
# build up SW
sw_plus_params = [
(
_get_colabfold_software(cf_config["version"]), cf_config["params"]
)
]
if cf_config["use_mmseqs"]:
sw_plus_params.append((_get_mmseqs2_software(), {}))
sw_plus_params.append((
_get_af2_software(is_multimer=cf_config["use_multimer"]), {}
))
# modelling step
protocol.append({
"method_type": "modeling",
"name": None,
"details": cf_config["description"],
"input": "target_sequences_and_ref_DBs",
"output": "model",
"software_plus_params": sw_plus_params,
})
return protocol
def _get_title(metadata):
"""Get a title for this modelling experiment."""
return metadata["title"].strip()
def _get_model_details(metadata):
"""Get the model description."""
return metadata["abstract"].strip()
################################################################################
################################################################################
# ModelCIF HANDLING
################################################################################
# pylint: disable=too-few-public-methods
class _GlobalPTM(modelcif.qa_metric.Global, modelcif.qa_metric.PTM):
"""Predicted accuracy according to the TM-score score in [0,1]"""
name = "pTM"
software = None
class _GlobalIpTM(modelcif.qa_metric.Global, modelcif.qa_metric.IpTM):
"""Predicted protein-protein interface score based on TM-score in [0,1]"""
name = "ipTM"
software = None
class _GlobalPLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT):
"""Predicted accuracy according to the CA-only lDDT in [0,100]"""
name = "pLDDT"
software = None
class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT):
"""Predicted accuracy according to the CA-only lDDT in [0,100]"""
name = "pLDDT"
software = None
class _LocalPairwisePAE(modelcif.qa_metric.LocalPairwise, modelcif.qa_metric.PAE):
"""Predicted aligned error (in Angstroms)"""
name = "PAE"
software = None
class _LPeptideAlphabetWithXO(ihm.LPeptideAlphabet):
"""Have the default amino acid alphabet plus 'X' for unknown residues
and 'O' as allowed non-def. AA (U already in alphabet)."""
# extra entry added according to LPeptideAlphabet def. in
# https://python-ihm.readthedocs.io/en/latest/_modules/ihm.html
# and https://files.rcsb.org/view/1NTH.cif for values for 'O'.
def __init__(self):
"""Create the alphabet."""
super().__init__()
self._comps["X"] = self._comps["UNK"]
self._comps['O'] = ihm.LPeptideChemComp(
"PYL", "O", "O", "PYRROLYSINE", "C12 H21 N3 O3"
)
# pylint: enable=too-few-public-methods
class _OST2ModelCIF(modelcif.model.AbInitioModel):
"""Map OST entity elements to ihm.model"""
def __init__(self, *args, **kwargs):
"""Initialise a model"""
for i in ["ost_entity", "asym", "scores_json", "incl_pae"]:
if i not in kwargs:
raise TypeError(f"Required keyword argument '{i}' not found.")
self.ost_entity = kwargs.pop("ost_entity")
self.asym = kwargs.pop("asym")
self.scores_json = kwargs.pop("scores_json")
self.incl_pae = kwargs.pop("incl_pae")
# use auth IDs for res. nums and chain names
self.use_auth = False
# what accuracy to use for PAE? (writer uses 3 anyway)
self.pae_digits = 3
super().__init__(*args, **kwargs)
def get_atoms(self):
# ToDo [internal]: Take B-factor out since its not a B-factor?
# NOTE: this assumes that _get_res_num maps residue to pos. in seqres
# within asym
for atm in self.ost_entity.atoms:
yield modelcif.model.Atom(
asym_unit=self.asym[_get_ch_name(atm.chain, self.use_auth)],
seq_id=_get_res_num(atm.residue, self.use_auth),
atom_id=atm.name,
type_symbol=atm.element,
x=atm.pos[0],
y=atm.pos[1],
z=atm.pos[2],
het=atm.is_hetatom,
biso=atm.b_factor,
occupancy=atm.occupancy,
)
def add_scores(self):
"""Add QA metrics from AF2 scores."""
# global scores
self.qa_metrics.extend(
(
_GlobalPLDDT(self.scores_json["plddt_global"]),
_GlobalPTM(self.scores_json["ptm"]),
)
)
if "iptm" in self.scores_json:
self.qa_metrics.extend(
(
_GlobalIpTM(self.scores_json["iptm"]),
)
)
# NOTE: none of the below expected top work if we have unmodelled gaps!
# local scores
lpae = []
i = 0
for chn_i in self.ost_entity.chains:
ch_name_i = _get_ch_name(chn_i, self.use_auth)
for res_i in chn_i.residues:
# local pLDDT
res_num_i = _get_res_num(res_i, self.use_auth)
self.qa_metrics.append(
_LocalPLDDT(
self.asym[ch_name_i].residue(res_num_i),
self.scores_json["plddt"][i],
)
)
# pairwise alignment error
if self.incl_pae:
j = 0
for chn_j in self.ost_entity.chains:
ch_name_j = _get_ch_name(chn_j, self.use_auth)
for res_j in chn_j.residues:
res_num_j = _get_res_num(res_j, self.use_auth)
pae_ij = self.scores_json["pae"][i][j]
lpae.append(
_LocalPairwisePAE(
self.asym[ch_name_i].residue(res_num_i),
self.asym[ch_name_j].residue(res_num_j),
round(pae_ij, self.pae_digits),
)
)
j += 1
i += 1
if self.incl_pae:
self.qa_metrics.extend(lpae)
def _get_modelcif_entities(target_ents, asym_units, system):
"""Create ModelCIF entities and asymmetric units."""
alphabet = _LPeptideAlphabetWithXO()
for cif_ent in target_ents:
# collect references
up_ref = modelcif.reference.UniProt(
code=cif_ent["up_id"],
accession=cif_ent["up_ac"],
isoform=cif_ent["up_isoform"],
ncbi_taxonomy_id=cif_ent["up_ncbi_taxid"],
organism_scientific=cif_ent["up_organism"],
sequence_version_date=cif_ent["up_last_mod"],
sequence_crc64=cif_ent["up_crc64"],
sequence=cif_ent["up_sequence"],
)
# ASSUME: full model covered w/o mismatches
# -> NOTE: sequence passed above is cut based on alignments!
up_ref.alignments.append(modelcif.reference.Alignment(
db_begin=cif_ent["up_range"][0],
db_end=cif_ent["up_range"][1],
entity_begin=cif_ent["mdl_range"][0],
entity_end=cif_ent["mdl_range"][1],
seq_dif=[
ihm.reference.SeqDif(
mismatch[1],
alphabet[mismatch[2]],
alphabet[mismatch[3]],
) for mismatch in cif_ent["mismatches"]
]
))
#
references = [up_ref]
# combine into ModelCIF entity
mdlcif_ent = modelcif.Entity(
cif_ent["seqres"],
description=cif_ent["description"],
alphabet=alphabet,
source=ihm.source.Natural(
ncbi_taxonomy_id=cif_ent["up_ncbi_taxid"],
scientific_name=cif_ent["up_organism"],
),
references=references,
)
# NOTE: this assigns (potentially new) alphabetic chain names
for pdb_chain_id in cif_ent["pdb_chain_ids"]:
asym_units[pdb_chain_id] = modelcif.AsymUnit(
mdlcif_ent, strand_id=pdb_chain_id,
)
system.entities.append(mdlcif_ent)
def _get_assoc_pae_file(entry_id, mdl_name):
"""Generate a associated file object to extract PAE to extra file."""
return modelcif.associated.LocalPairwiseQAScoresFile(
f"{mdl_name}_local_pairwise_qa.cif",
categories=["_ma_qa_metric_local_pairwise"],
copy_categories=["_ma_qa_metric"],
entry_id=entry_id,
entry_details="This file is an associated file consisting "
+ "of local pairwise QA metrics. This is a partial mmCIF "
+ "file and can be validated by merging with the main "
+ "mmCIF file containing the model coordinates and other "
+ "associated data.",
details="Predicted aligned error",
)
def _get_assoc_png_file(fle_path, png_type):
"""Generate a modelcif.associated.File object pointing to PNG file
with content defined by png_type (coverage, plddt, or pae).
"""
details = {
"coverage": "PNG file showing number of sequences in the MSA covering "
+ "each position in the target sequences",
"plddt": "PNG file showing pLDDT at each residue position for each "
+ "of the 5 models produced",
"pae": "PNG file showing the PAE matrices for each of the 5 models "
+ "produced",
}
afile = modelcif.associated.File(
fle_path,
details=details[png_type],
)
# NOTE: file_format can be set to "png" in future ModelCIF versions
# (i.e. when https://github.com/ihmwg/ModelCIF/issues/17 is resolved)
afile.file_format = "other"
afile.file_content = "other"
return afile
def _get_assoc_mdl_file(fle_path, data_json):
"""Generate a modelcif.associated.File object that looks like a CIF file.
The dedicated CIFFile functionality in modelcif would also try to write it.
"""
details=f"#{data_json['mdl_rank']} ranked model; " + f"pLDDT {round(data_json['plddt_global'], 1)}, " + f"pTM {round(data_json['ptm'], 3)}"
if 'iptm' in data_json:
details = details + f", ipTM {round(data_json['iptm'], 3)}"
cfile = modelcif.associated.File(fle_path, details)
cfile.file_format = "cif"
return cfile
def _get_assoc_zip_file(fle_path, data_json):
"""Create a modelcif.associated.File object that looks like a ZIP file.
This is NOT the archive ZIP file for the PAEs but to store that in the
ZIP archive of the selected model."""
zfile = modelcif.associated.File(
fle_path,
details="archive with multiple files for "
+ f"#{data_json['mdl_rank']} ranked model",
)
zfile.file_format = "other"
return zfile
def _get_associated_files(mdl_name, arc_files):
"""Create entry for associated files."""
# package all into zip file
return modelcif.associated.Repository(
"",
[modelcif.associated.ZipFile(f"{mdl_name}.zip", files=arc_files)],
)
# NOTE: by convention MA expects zip file with same name as model-cif
def _get_sequence_dbs(config_data):
"""Get ColabFold seq. DBs."""
# Uses HC list of known DBs used in ColabFold
# -> see also notes in _get_config
db_dict = {
"UniRef_2021_03": modelcif.ReferenceDatabase(
"UniRef30",
"https://wwwuser.gwdg.de/~compbiol/colabfold/uniref30_2103.tar.gz",
version="2021_03",
),
"UniRef_2022_02": modelcif.ReferenceDatabase(
"UniRef30",
"https://wwwuser.gwdg.de/~compbiol/colabfold/uniref30_2202.tar.gz",
version="2022_02",
),
"UniRef_2023_02": modelcif.ReferenceDatabase(
"UniRef30",
"https://wwwuser.gwdg.de/~compbiol/colabfold/uniref30_2302.tar.gz",
version="2023_02",
),
"Environmental": modelcif.ReferenceDatabase(
"ColabFold DB",
"https://wwwuser.gwdg.de/~compbiol/colabfold/"
+ "colabfold_envdb_202108.tar.gz",
version="2021_08",
),
"PDB100_230517": modelcif.ReferenceDatabase(
"PDB100",
"https://wwwuser.gwdg.de/~compbiol/data/hhsuite/databases/"
+ "hhsuite_dbs/pdb100_foldseek_230517.tar.gz",
release_date=datetime.datetime(2023, 5, 17)
),
"PDB70_211027": modelcif.ReferenceDatabase(
"PDB70",
"https://wwwuser.gwdg.de/~compbiol/data/hhsuite/databases/"
+ "hhsuite_dbs/pdb70_from_mmcif_211027.tar.gz",
release_date=datetime.datetime(2021, 10, 27)
),
"PDB70_211117": modelcif.ReferenceDatabase(
"PDB70",
"https://wwwuser.gwdg.de/~compbiol/data/hhsuite/databases/"
+ "hhsuite_dbs/pdb70_from_mmcif_211117.tar.gz",
release_date=datetime.datetime(2021, 11, 17)
),
"PDB70_220313": modelcif.ReferenceDatabase(
"PDB70",
"https://wwwuser.gwdg.de/~compbiol/data/hhsuite/databases/"
+ "hhsuite_dbs/pdb70_from_mmcif_220313.tar.gz",
release_date=datetime.datetime(2022, 3, 13)
),
}
# fill list of DBs
seq_dbs = []
search_keys = []
for seq_db in config_data["seq_dbs"]:
if seq_db == "UniRef":
if config_data['ur30_db_version'] is None:
raise ValueError("Cannot use UniRef without version")
search_key = f"UniRef_{config_data['ur30_db_version']}"
else:
search_key = seq_db
search_keys.append(search_key)
if config_data["tpl_db"] is not None:
if config_data["tpl_db_version"] is None:
raise ValueError("Cannot have tpl DB without version")
search_keys.append(
f"{config_data['tpl_db']}_{config_data['tpl_db_version']}"
)
for search_key in search_keys:
if search_key not in db_dict:
raise ValueError(f"Unknown seq. DB {search_key}")
seq_dbs.append(db_dict[search_key])
return seq_dbs
def _assemble_modelcif_software(soft_dict, params_dict):
"""Create a modelcif.SoftwareWithParameters instance from dictionaries."""
# create SW object
sw = modelcif.Software(
soft_dict["name"],
soft_dict["classification"],
soft_dict["description"],
soft_dict["location"],
soft_dict["type"],
soft_dict["version"],
citation=soft_dict["citation"],
)
# assemble parameters
params = []
for key, val in params_dict.items():
params.append(modelcif.SoftwareParameter(key, val))
# put them together
return modelcif.SoftwareWithParameters(sw, params)
def _get_modelcif_protocol_software(js_step):
"""Assemble software entries for a ModelCIF protocol step."""
# new setup in python-modelcif (as of late 2023): params with each SW
sw_list = []
for sw, sw_params in js_step["software_plus_params"]:
sw_list.append(_assemble_modelcif_software(sw, sw_params))
# group and done...
if sw_list:
return modelcif.SoftwareGroup(sw_list)
else:
return None
def _get_modelcif_protocol_data(data_label, target_entities, model, ref_dbs):
"""Assemble data for a ModelCIF protocol step."""
if data_label == "target_sequences_and_ref_DBs":
data = modelcif.data.DataGroup(target_entities)
data.extend(ref_dbs)
elif data_label == "model":
data = model
else:
raise RuntimeError(f"Unknown protocol data: '{data_label}'")
return data
def _get_modelcif_protocol(protocol_steps, target_entities, model, ref_dbs):
"""Create the protocol for the ModelCIF file."""
protocol = modelcif.protocol.Protocol()
for js_step in protocol_steps:
sftwre = _get_modelcif_protocol_software(js_step)
input_data = _get_modelcif_protocol_data(
js_step["input"], target_entities, model, ref_dbs
)
output_data = _get_modelcif_protocol_data(
js_step["output"], target_entities, model, ref_dbs
)
protocol.steps.append(
modelcif.protocol.Step(
input_data=input_data,
output_data=output_data,
name=js_step["name"],
details=js_step["details"],
software=sftwre,
)
)
protocol.steps[-1].method_type = js_step["method_type"]
return protocol
def _compress_cif_file(cif_file):
"""Compress cif file and delete original."""
with open(cif_file, "rb") as f_in:
with gzip.open(cif_file + ".gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(cif_file)
def _package_associated_files(repo):
"""Compress associated files into single zip file and delete original."""
# zip settings tested for good speed vs compression
for archive in repo.files:
with zipfile.ZipFile(archive.path, "w", zipfile.ZIP_BZIP2) as cif_zip:
for zfile in archive.files:
cif_zip.write(zfile.path, arcname=zfile.path)
os.remove(zfile.path)
def _store_as_modelcif(data_json, ost_ent, out_dir, mdl_name, compress,
add_pae, add_pngs, add_files):
"""Mix all the data into a ModelCIF file."""
print(" generating ModelCIF objects...", end="")
pstart = timer()
# create system to gather all the data
system = modelcif.System(
title=data_json["title"],
id=data_json["mdl_id"].upper(),
model_details=data_json["model_details"],
)
# create an asymmetric unit and an entity per target sequence
asym_units = {}
_get_modelcif_entities(data_json["target_entities"], asym_units, system)
# audit_authors
system.authors.extend(data_json["audit_authors"])
# set up the model to produce coordinates
model = _OST2ModelCIF(
assembly=modelcif.Assembly(asym_units.values()),
asym=asym_units,
ost_entity=ost_ent,
scores_json=data_json,
name=data_json["mdl_name"],
incl_pae=add_pae,
)
print(f" ({timer()-pstart:.2f}s)")
print(" processing QA scores...", end="", flush=True)
pstart = timer()
model.add_scores()
print(f" ({timer()-pstart:.2f}s)")
model_group = modelcif.model.ModelGroup([model])
system.model_groups.append(model_group)
# handle additional files
arc_files = []
if add_pae:
arc_files.append(_get_assoc_pae_file(system.id, mdl_name))
if add_pngs:
for png_type in ["coverage", "plddt", "pae"]:
arc_files.append(_get_assoc_png_file(
data_json[f"{png_type}_png_file_name"], png_type
))
arc_files.extend(add_files)
if arc_files:
system.repositories.append(_get_associated_files(mdl_name, arc_files))
# get data and steps
ref_dbs = _get_sequence_dbs(data_json["cf_config"])
protocol = _get_modelcif_protocol(
data_json["protocol"], system.entities, model, ref_dbs,
)
system.protocols.append(protocol)
# write modelcif System to file
print(" write to disk...", end="", flush=True)
pstart = timer()
# NOTE: this will dump PAE on path provided in add_scores
# -> hence we cheat by changing path and back while being exception-safe...
oldpwd = os.getcwd()
os.chdir(out_dir)
mdl_fle = f"{mdl_name}.cif"
try:
with open(mdl_fle, "w", encoding="ascii") as mmcif_fh:
modelcif.dumper.write(mmcif_fh, [system])
if add_pngs:
for png_type in ["coverage", "plddt", "pae"]:
with open(data_json[f"{png_type}_png_file_name"], "wb") as fh:
fh.write(data_json[f"{png_type}_png_data"])
if arc_files:
_package_associated_files(system.repositories[0])
if compress:
_compress_cif_file(mdl_fle)
mdl_fle += ".gz"
finally:
os.chdir(oldpwd)
print(f" ({timer()-pstart:.2f}s)")
assoc_files = [_get_assoc_mdl_file(mdl_fle, data_json)]
if arc_files:
assoc_files.append(
_get_assoc_zip_file(system.repositories[0].files[0].path, data_json)
)
return assoc_files
################################################################################
################################################################################
# HANDLE FULL DATA SET
################################################################################
def _get_cf_db_versions(dt):
"""Returns the DB versions and name used to generate the model in colabfold for a given date"""
# logic: newest first, tuple with ur30_db_version, tpl_db, tpl_db_version
# -> set to UNK if +-1 day or for unknown tpl_db_version
switch_dates = [
(datetime.datetime(2023, 6, 12), ("2023_02", "PDB100", "230517")),
(datetime.datetime(2022, 7, 13), ("2022_02", "PDB70", "220313")),
(datetime.datetime(2021, 1, 1), ("2021_03", "PDB70", "UNK")),
]
for switch_dt, dbs in switch_dates:
dd = (dt - switch_dt).days
if abs(dd) <= 1:
return ("UNK", "UNK", "UNK")
elif dd > 1:
return dbs
def _translate2modelcif_single(
metadata, opts, mdl_id, data_from_zip, entities, mdl_rank, add_files=[]
):
"""Convert a single model with its accompanying data to ModelCIF."""
mdl_data_from_zip = data_from_zip[f"rank_{mdl_rank}"]
mdl_af_id = mdl_data_from_zip["mdl_id"]
if mdl_rank > 1:
mdl_id += f"_rank_{mdl_rank}_{mdl_af_id}"
print(f" translating {mdl_id}...")
pdb_start = timer()
# gather data into JSON-like structure
print(" preparing data...", end="")
pstart = timer()
mdlcf_json = {}
config_dict = metadata["config_dict"].copy()
ur30_db_version = _get_cf_db_versions(metadata["Final_date"])
cf_config = _get_cf_config(config_dict, ur30_db_version[0])
mdlcf_json["audit_authors"] = _get_audit_authors()
mdlcf_json["protocol"] = _get_protocol_steps_and_software(cf_config)
mdlcf_json["cf_config"] = cf_config
mdlcf_json["mdl_id"] = mdl_id # used for entry ID
mdlcf_json["mdl_rank"] = mdl_rank
if mdl_rank == 1:
mdlcf_json["mdl_name"] = f"Top ranked model ({mdl_af_id})"
else:
mdlcf_json["mdl_name"] = f"#{mdl_rank} ranked model ({mdl_af_id})"
mdlcf_json["target_entities"] = entities
for scores_key in ["plddt", "pae"]:
mdlcf_json[scores_key] = mdl_data_from_zip["scores"][scores_key]
# override global scores with higher accuracy ones in metadata
mdlcf_json["plddt_global"] = np.mean(mdl_data_from_zip["scores"][f"plddt"])
mdlcf_json["ptm"] = mdl_data_from_zip["scores"][f"ptm"]
if "iptm" in mdl_data_from_zip["scores"]:
mdlcf_json["iptm"] = mdl_data_from_zip["scores"][f"iptm"]
#
mdlcf_json["title"] = _get_title(metadata)
if mdl_rank != 1:
mdlcf_json["title"] += f" (#{mdl_rank} ranked model)"
mdlcf_json["model_details"] = _get_model_details(metadata)
# fill PNG data
for png_type in ["coverage", "plddt", "pae"]:
mdlcf_json[f"{png_type}_png_file_name"] = f"{mdl_id}_{png_type}.png"
mdlcf_json[f"{png_type}_png_data"] = data_from_zip[
f"{png_type}_png_data"
]
print(f" ({timer()-pstart:.2f}s)")
# save ModelCIF
assoc_files = _store_as_modelcif(
data_json=mdlcf_json,
ost_ent=mdl_data_from_zip["ent"],
out_dir=opts.out_dir,
mdl_name=mdl_id,
compress=(mdl_rank == 1 and opts.compress),
add_pae=(mdl_rank == 1 and not opts.no_extra_files),
add_pngs=(mdl_rank == 1 and not opts.no_extra_files),
add_files=add_files
)
# check if result can be read and has expected seq.
mdl_path = os.path.join(opts.out_dir, assoc_files[0].path)
ent, ss = io.LoadMMCIF(mdl_path, seqres=True)
exp_seqs = []
for trg_ent in mdlcf_json["target_entities"]:
exp_seqs += [trg_ent["pdb_sequence"]] * len(trg_ent["pdb_chain_ids"])
assert ent.chain_count == len(exp_seqs), f"Bad chain count {mdl_id}"
# NOTE: here we expect auth = label IDs
ent_seq = "".join([_get_sequence(chn, False) for chn in ent.chains])
ent_seq_a = "".join([_get_sequence(chn, True) for chn in ent.chains])
assert ent_seq == ent_seq_a
assert ent_seq == "".join(exp_seqs), f"Bad seq. {mdl_id}"
ent_seqres = [ss.FindSequence(chn.name).string for chn in ent.chains]
exp_seqres = []
for trg_ent in mdlcf_json["target_entities"]:
exp_seqres += [trg_ent["seqres"]] * len(trg_ent["pdb_chain_ids"])
assert ent_seqres == exp_seqres, f"Bad seqres {mdl_id}"
print(f" ... done with {mdl_id} ({timer()-pdb_start:.2f}s).")
return assoc_files
def _translate2modelcif(metadata, opts, desired_ranks=3):
"""Convert a model with its accompanying data to ModelCIF."""
mdl_id = metadata["mdl_id"]
# skip if done already (done later here due to info to be returned)
if opts.compress:
cifext = "cif.gz"
else:
cifext = "cif"
mdl_path = os.path.join(opts.out_dir, f"{mdl_id}.{cifext}")
# prepare data for models to convert (also gets all issues)
issues = []
ent_dict = {}
data_from_zip = _parse_zip_file(
metadata["zip_file_path"], desired_ranks=desired_ranks
)
for num in range(desired_ranks):
mdl_rank = num + 1
mdl_data_from_zip = data_from_zip[f"rank_{mdl_rank}"]
entities, ent_issues = _get_entities(mdl_data_from_zip, metadata)
issues.extend(ent_issues)
ent_dict[mdl_rank] = entities
scores_issues = _check_scores(mdl_data_from_zip, metadata, mdl_rank)
issues.extend(scores_issues)
# abort here if already done
if opts.checks_only:
return issues
if os.path.exists(mdl_path):
print(f" {mdl_id} already done...")
return issues
# convert models if needed starting from lower ranked ones
add_files = []
if not opts.no_extra_files:
for num in range(1, desired_ranks):
mdl_rank = num + 1
entities = ent_dict[mdl_rank]
assoc_files = _translate2modelcif_single(
metadata, opts, mdl_id, data_from_zip, entities, mdl_rank
)
add_files.extend(assoc_files)
# do top ranked one with assoc. files
_translate2modelcif_single(
metadata, opts, mdl_id, data_from_zip, ent_dict[1],
mdl_rank=1, add_files=add_files
)
return issues
def _get_metadata(input_data_path, single_model=None):
"""Read various metadata files and prepare for next steps.
Returns dict with key = mdl_num (int) and value = dict with:
- mdl_id, zip_file_path, abstract, title, config_dict
- chains (list of dict with chain, name, up_ac)
"""
# fetch data of models generation (used for DB version)
dates_json_path = os.path.join(input_data_path, "Annotations", "dates.json")
dates_data = json.load(open(dates_json_path))
# fetch and check JSON data
json_path = os.path.join(input_data_path, "Annotations", "Annotations.json")
metadata_json_dict = json.load(open(json_path))
num_models = len(metadata_json_dict)
exp_mdl_nums = list(range(1, num_models + 1))
assert [int(k) for k in metadata_json_dict] == exp_mdl_nums
for key, data in metadata_json_dict.items():
# sanity checks on data
if sorted(data.keys()) != ["abstract", "chains", "title"]:
raise RuntimeError(f"Wrong dict-keys observed for mdl {key}")
for chain in data["chains"]:
if sorted(chain.keys()) != ["chain", "name", "up_ac"]:
raise RuntimeError(
f"Wrong dict-keys observed in chain for mdl {key}"
)
# If the model is a subset of UP sequence, parse the range
paren_index = chain['up_ac'].find('(')
if paren_index != -1:
# Split the string into accession code and range
chain['up_range'] = chain['up_ac'][paren_index+1:-1].strip() # Remove parentheses
chain['up_ac'] = chain['up_ac'][:paren_index].strip()
else:
# No range present; the whole string is the accession code
chain['up_range'] = None
data.update({"Final_date": datetime.datetime.strptime(dates_data[key]['final_date'], '%Y-%m-%d')})
# fetch and check zip file paths
zip_files = {} # key = int(X) in [X]-[NAME].zip, value = dict
zip_path = os.path.join(input_data_path, "Zip_files")
for f_name in sorted(os.listdir(zip_path)):
if f_name.endswith(".zip"):
# assume name [X]-[NAME].zip
mdl_id = os.path.splitext(f_name)[0]
ss = mdl_id.split("-", 1)
zip_dict = {
"mdl_id": mdl_id,
"zip_file_path": os.path.join(zip_path, f_name)
}
mdl_num = int(ss[0])
# sanity check to make sure mdl_id works for alphabetic sorting
assert f"{mdl_num:03d}" == ss[0]
assert mdl_num not in zip_files
zip_files[mdl_num] = zip_dict
# check that there are no gaps (assures we can safely regenerate numbers during import)
# get configs
configs_path = os.path.join(input_data_path, "Configs")
config_dicts = {} # key = mdl_num, value = JSON content
# (default to mdl_num=1 for ones not in dict)
for f_name in sorted(os.listdir(configs_path)):
if f_name.endswith(".json"):
# file names: config_[NUM].json or config_[NUM1]_[NUM2].json
nums = os.path.splitext(f_name)[0].split('_')[1:]
config_data = json.load(open(os.path.join(configs_path, f_name)))
for num in nums:
assert int(num) not in config_dicts
assert num in metadata_json_dict
config_dicts[int(num)] = config_data
# combine data
metadata_all = {} # key = mdl_num (int), value = dict
for key, data in metadata_json_dict.items():
# restrict if needed
if single_model is not None and key != single_model:
continue
mdl_num = int(key)
metadata_all[mdl_num] = data
data.update(zip_files[mdl_num])
data["config_dict"] = config_dicts.get(mdl_num, config_dicts[1])
return metadata_all
def _main():
"""Run as script."""
# parse/fetch global data
opts = _parse_args()
metadata_all = _get_metadata(opts.input_data_path, opts.single_model)
# iterate over models
print(f"Working on models in {opts.input_data_path}...")
issues = []
for metadata in metadata_all:
new_issues = _translate2modelcif(metadata_all[metadata], opts)
issues.extend(new_issues)
print(f"... done with models in {opts.input_data_path}.")
if opts.single_model is None:
# dump issues
issues_file_path = os.path.join(opts.out_dir, "issues.json")
json.dump(issues, open(issues_file_path, "w"))
# dump info on which ones to export to 3D-Beacons
to_export_file_path = os.path.join(opts.out_dir, "MA_to_export.json")
to_export = {
metadata_all[metadata]["mdl_id"]: ("known interaction" in metadata_all[metadata]["abstract"]) \
for metadata in metadata_all
}
json.dump(to_export, open(to_export_file_path, "w"))
# TEST: to judge res. needed on cluster
import resource
print('mem', resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000)
if __name__ == "__main__":
_main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment