diff --git a/projects/2024-12-ma-denv/translate2modelcif.py b/projects/2024-12-ma-denv/translate2modelcif.py new file mode 100644 index 0000000000000000000000000000000000000000..daf668bf1087841f9974d9bb53d19c6f13df6761 --- /dev/null +++ b/projects/2024-12-ma-denv/translate2modelcif.py @@ -0,0 +1,2524 @@ +# -*- coding: utf-8 -*- + +"""Check & extend SWISS-MODEL models for ma-denv + +Example for running: +ToDo: Example call + +ToDo: Update expected output once script is done +Expected output in ./modelcif for example above: +- ma-taas-0272.cif as ModelCIF file +- ma-taas-0272.zip with accompanying data +- ma-taas-0272-image.png as image to use in ModelArchive +- ma-taas-0272-issues.json listing issues for that conversion (if any) +""" + +import argparse +import os +import sys + +# ToDo: NEEDED? import datetime +# ToDo: NEEDED? import gzip +# ToDo: NEEDED? import shutil +# ToDo: NEEDED? import zipfile +# ToDo: NEEDED? import pickle +# ToDo: NEEDED? import re +# ToDo: NEEDED? import traceback + +# ToDo: NEEDED? from io import StringIO +# ToDo: NEEDED? from timeit import default_timer as timer +# ToDo: NEEDED? import numpy as np +# ToDo: NEEDED? import requests +# ToDo: NEEDED? import ujson as json + +# ToDo: NEEDED? import ihm +# ToDo: NEEDED? import ihm.citations + +# ToDo: NEEDED? import modelcif +# ToDo: NEEDED? import modelcif.associated +# ToDo: NEEDED? import modelcif.dumper +# ToDo: NEEDED? import modelcif.model +# ToDo: NEEDED? import modelcif.protocol +# ToDo: NEEDED? import modelcif.reference +# ToDo: NEEDED? import gemmi + +# ToDo: NEEDED? import pandas as pd +# ToDo: NEEDED? from ost import io, seq + + +################################################################################ +# GENERAL HELPER FUNCTIONS +################################################################################ +def _abort_msg(msg, exit_code=1): + """Write error message and exit with exit_code.""" + print(f"{msg}\nAborting.", file=sys.stderr) + sys.exit(exit_code) + + +# def _warn_msg(msg): +# """Write a warning message to stdout.""" +# print(f"WARNING: {msg}") + + +def _check_file(file_path): + """Make sure a file exists and is actually a file.""" + if not os.path.exists(file_path): + _abort_msg(f"File not found: '{file_path}'.") + if not os.path.isfile(file_path): + _abort_msg(f"File path does not point to file: '{file_path}'.") + + +# def _check_folder(dir_path): +# """Make sure a file exists and is actually a file.""" +# if not os.path.exists(dir_path): +# _abort_msg(f"Path not found: '{dir_path}'.") +# if not os.path.isdir(dir_path): +# _abort_msg(f"Path does not point to a directory: '{dir_path}'.") + + +# def _check_opts_folder(dir_path): +# """Remove trailing '/' (return fixed one) and check if path valid.""" +# if dir_path.endswith("/"): +# dir_path = dir_path[:-1] +# _check_folder(dir_path) +# return dir_path + + +# def _get_res_num(r, use_auth=False): +# """Get res. num. from auth. IDs if reading from mmCIF files.""" +# if use_auth: +# return int(r.GetStringProp("pdb_auth_resnum")) +# return r.number.num + + +# def _get_ch_name(ch, use_auth=False): +# """Get chain name from auth. IDs if reading from mmCIF files.""" +# if use_auth: +# return ch.GetStringProp("pdb_auth_chain_name") +# return ch.name + + +# def _get_sequence(chn, use_auth=False): +# """Get the sequence out of an OST chain incl. '-' for gaps in resnums.""" +# # initialise (add gaps if first is not at num. 1) +# lst_rn = _get_res_num(chn.residues[0], use_auth) +# idx = 1 +# sqe = "-" * (lst_rn - 1) + chn.residues[0].one_letter_code +# +# for res in chn.residues[idx:]: +# lst_rn += 1 +# while lst_rn != _get_res_num(res, use_auth): +# sqe += "-" +# lst_rn += 1 +# sqe += res.one_letter_code +# return sqe + + +# class ZipFileHandler: +# """ +# A class to handle ZIP files, including nested ZIP files, allowing for +# listing files and extracting or reading files on demand. +# File names are represented as tuples where all besides the last item +# are expected to be nested paths to ZIP files. + +# Attributes: +# zip_path (str): The path to the main ZIP file. +# file_list (list): List of tuples representing the hierarchical ... +# """ + +# def __init__(self, zip_path): +# """ +# Initializes the ZipFileHandler with the path to the main ZIP file. + +# Args: +# zip_path (str): The path to the main ZIP file. +# """ +# self.zip_path = zip_path +# self.file_list = self._build_file_list() + +# def _build_file_list(self, zip_file=None, base_path=()): +# """ +# Recursively builds a list representing the hierarchical file structure +# within the ZIP file. + +# Args: +# zip_file (zipfile.ZipFile, optional): The current ZIP file ... +# If None, the main ZIP file is processed. Default is None. +# base_path (tuple): The base path within the hierarchical ... + +# Returns: +# list of tuples representing the hierarchical file structure. +# """ +# file_list = [] +# if zip_file is None: +# with zipfile.ZipFile(self.zip_path, "r") as zip_ref: +# file_list.extend(self._build_file_list(zip_ref)) +# else: +# for file_info in zip_file.infolist(): +# file_name = file_info.filename +# if file_name.endswith("/"): +# continue # Skip directories +# if file_name.endswith(".zip"): +# with zip_file.open(file_name) as nested_zip: +# with zipfile.ZipFile(nested_zip) as nested_zip_ref: +# nested_base_path = base_path + (file_name,) +# file_list.extend( +# self._build_file_list( +# nested_zip_ref, nested_base_path +# ) +# ) +# else: +# file_list.append(base_path + (file_name,)) +# return file_list + +# def _read_all_files(self, zip_file=None, base_path=()): +# """ +# Recursively builds a list representing the hierarchical file structure +# within the ZIP file and reads all files. + +# Args: +# zip_file (zipfile.ZipFile, optional): The current ZIP file ... +# If None, the main ZIP file is processed. Default is None. +# base_path (tuple): The base path within the hierarchical ... + +# Returns: +# dictionary with key matching tuples in self.file_list and +# value being raw read content of file as in self.read_file. +# """ +# file_list = {} +# if zip_file is None: +# with zipfile.ZipFile(self.zip_path, "r") as zip_ref: +# file_list.update(self._read_all_files(zip_ref)) +# else: +# for file_name in zip_file.namelist(): +# if file_name.endswith("/"): +# continue # Skip directories +# if file_name.endswith(".zip"): +# with zip_file.open(file_name) as nested_zip: +# with zipfile.ZipFile(nested_zip) as nested_zip_ref: +# nested_base_path = base_path + (file_name,) +# file_list.update( +# self._read_all_files( +# nested_zip_ref, nested_base_path +# ) +# ) +# else: +# file_list[base_path + (file_name,)] = zip_file.open( +# file_name +# ).read() +# return file_list + +# def _zip_parser(self, current_zip, filename_tuple, callback): +# """ +# Hierarchical parsing of ZIP file through filename_tuple. +# Calls callback(zip_ref, name) at final nesting level with +# zip_ref being the open ZipFile and name the filename within it. +# """ +# with zipfile.ZipFile(current_zip, "r") as zip_ref: +# name = filename_tuple[0] +# if len(filename_tuple) == 1: +# return callback(zip_ref, name) +# else: +# with zip_ref.open(name) as nested_zip: +# return self._zip_parser( +# nested_zip, filename_tuple[1:], callback +# ) + +# def extract_file(self, filename_tuple, extract_to="."): +# """ +# Extracts a specific file from the ZIP file based on the ... +# Note that the extracted file path will be according to ... +# within the extract_to folder. + +# Args: +# filename_tuple (tuple): A tuple representing the hierarchical ... +# extract_to (str): The directory to extract the file to. ... + +# Raises: +# FileNotFoundError: If the specified file is not found in the ... +# """ +# self._zip_parser( +# self.zip_path, +# filename_tuple, +# lambda zip_ref, name: zip_ref.extract(name, path=extract_to), +# ) + +# def extract_file_to_path(self, filename_tuple, output_path): +# """ +# ... +# """ + +# def _extract_to_path(zip_ref, name): +# with zip_ref.open(name) as source, open( +# output_path, "wb" +# ) as target: +# shutil.copyfileobj(source, target) + +# self._zip_parser(self.zip_path, filename_tuple, _extract_to_path) + +# def read_file(self, filename_tuple): +# """ +# Reads and returns the content of a specific file from the ZIP ... + +# Args: +# filename_tuple (tuple): A tuple representing the hierarchical ... + +# Returns: +# bytes: The content of the specified file. + +# Raises: +# FileNotFoundError: If the specified file is not found in the ... +# """ +# return self._zip_parser( +# self.zip_path, +# filename_tuple, +# lambda zip_ref, name: zip_ref.open(name).read(), +# ) + +# def read_all_files(self): +# """ +# Recursively builds a list representing the hierarchical file structure +# within the ZIP file and reads all files. + +# NOTE: for large zip files with data in nested zip file, this is much, +# much faster than calling read_file individually! + +# Returns: +# dictionary with key matching tuples in self.file_list and +# value being raw read content of file as in self.read_file. +# """ +# return self._read_all_files() + +# def process_file(self, filename_tuple, func): +# """ +# Opens a specific file from the ZIP file based on the hierarchical path +# and returns the result of func(fo) where fo is a file-like object. + +# Args: +# filename_tuple (tuple): A tuple representing the hierarchical ... +# func (function): Function to call on file-like object. + +# Returns: +# Result of func. + +# Raises: +# FileNotFoundError: If the specified file is not found in the ... +# """ +# return self._zip_parser( +# self.zip_path, +# filename_tuple, +# lambda zip_ref, name: func(zip_ref.open(name)), +# ) + + +################################################################################ + + +################################################################################ +# DATA HANDLING +################################################################################ +def _parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, + ) + + parser.add_argument( + "modelcif", + type=str, + metavar="<INPUT MODELCIF FILE>", + help="Path to a SWISS-MODEL ModelCIF file provided by depositors.", + ) + # ToDo: fill '?' + parser.add_argument( + "model_info_csv", + type=str, + metavar="<MODEL INFO CSV FILE>", + help="Path to a CSV file with information about each model. Format: " + + "<FILENAME>,<UNP AC>,?,?,?,?,?", + ) + parser.add_argument( + "out_dir", + type=str, + metavar="<OUTPUT DIR>", + help="Path to directory to store results. The updated ModelCIF file " + + "will be stored in this directory. If the ModelCIF file already " + + "exists in the output directory, it will not be overwritten and an " + + "error thrown.", + ) + parser.add_argument( + "--unp-json", + type=str, + default="./unp_data.json", + help="UniProtKB entries data. Information for each UNP entry fetched " + + "will be stored here. Serves as a cache. To update (e.g. after an " + + "UNP release), just delete the file and it will be recreated.", + ) + parser.add_argument( + "--compress", + default=False, + action="store_true", + help="Compress ModelCIF file with gzip.", + ) + # ToDo: do we want checks-only? + # parser.add_argument( + # "--checks-only", + # default=False, + # action="store_true", + # help="Only check for issues without producing ModelCIF files.", + # ) + opts = parser.parse_args() + + # check input + _check_file(opts.modelcif) + _check_file(opts.model_info_csv) + if opts.out_dir.endswith("/"): + opts.out_dir = opts.out_dir[:-1] + if not os.path.exists(opts.out_dir): + os.makedirs(opts.out_dir, exist_ok=True) + # check if the ModelCIF file already exists in out_dir + out_file = os.path.join( + opts.out_dir, + f"{os.path.basename(opts.modelcif)}{'.gz' if opts.compress else ''}", + ) + if os.path.exists(out_file): + _abort_msg(f"'{out_file}' already exists, will *not* be overwritten.") + # check if UNP is a file if it already exists + if os.path.exists(opts.unp_json): + _check_file(opts.unp_json) + + return opts + + +# def _get_audit_authors(): +# """Return the list of authors that produced this model.""" +# return ( +# "Dobbelstein, Adrian", +# "Alva, Vikram", +# ) + + +# def _get_file_list_dict(file_list): +# """ +# Get dictionary for access to files for conversion. + +# Args: +# file_list (list): As returned by ZipFileHandler. + +# Returns: +# dict with following keys / values: +# - "mdl_type" / "assembly" or "domain" +# - "json_file" / filename_tuple for access in ZipFileHandler +# - "cif_file" / filename_tuple for access in ZipFileHandler +# - "image_file" / filename_tuple for access in ZipFileHandler +# - "accompanying_data" / dict with +# filename without path as key and +# filename_tuple for access in ZipFileHandler as value +# - "unparsed_files" / list of filename_tuple +# """ +# result = { +# "mdl_type": None, +# "json_file": None, +# "cif_file": None, +# "image_file": None, +# "accompanying_data": {}, +# "unparsed_files": [], +# } +# for filename_tuple in file_list: +# basename = os.path.basename(filename_tuple[-1]) +# fs = filename_tuple[0].split(os.path.sep) +# if fs[-2] == "accompanying_data" or fs[-1] == "accompanying_data.zip": +# assert basename not in result["accompanying_data"] +# result["accompanying_data"][basename] = filename_tuple +# elif basename == "assembly_info.json": +# assert result["mdl_type"] in [None, "assembly"] +# assert result["json_file"] is None +# result["mdl_type"] = "assembly" +# result["json_file"] = filename_tuple +# elif basename == "assembly.cif": +# assert result["mdl_type"] in [None, "assembly"] +# assert result["cif_file"] is None +# result["mdl_type"] = "assembly" +# result["cif_file"] = filename_tuple +# elif basename == "domain_info.json": +# assert result["mdl_type"] in [None, "domain"] +# assert result["json_file"] is None +# result["mdl_type"] = "domain" +# result["json_file"] = filename_tuple +# elif basename == "domain.cif": +# assert result["mdl_type"] in [None, "domain"] +# assert result["cif_file"] is None +# result["mdl_type"] = "domain" +# result["cif_file"] = filename_tuple +# elif basename == "image.png": +# assert result["image_file"] is None +# result["image_file"] = filename_tuple +# else: +# result["unparsed_files"].append(filename_tuple) +# for k, v in result.items(): +# has_content = bool(v) +# if k != "unparsed_files" and not has_content: +# raise RuntimeError(f"{k} not set in zip file.") +# return result + + +# def _get_zip_content(zip_file_path): +# """Same as _get_file_list_dict but reads zip file content.""" +# zip_handler = ZipFileHandler(zip_file_path) +# zip_dict = _get_file_list_dict(zip_handler.file_list) +# zip_content = zip_handler.read_all_files() +# for k, v in zip_dict.items(): +# if k == "accompanying_data": +# v = {ak: zip_content[av] for ak, av in v.items()} +# elif k in ["json_file", "cif_file", "image_file"]: +# v = zip_content[v] +# zip_dict[k] = v +# return zip_dict + + +# def _get_modeller_software(version=None): +# """Get MODELLER as a dictionary, suitable to create a modelcif software +# object.""" +# # Note: depositor had suggested use of Webb and Sali 2016 but decided +# # against in given input from Ben Webb (1993 citation is still the ... +# # one: https://salilab.org/modeller/manual/node8.html). +# return { +# "name": "MODELLER", +# "classification": "model building", +# "description": "Comparative modeling by satisfaction of spatial " +# "restraints", +# "citation": ihm.citations.modeller, +# "location": "https://salilab.org/modeller/", +# "type": "program", +# "version": version, +# } + + +# # global definition to avoid duplicated entries in ModelCIF +# ihm.citations.af2_multimer = ihm.Citation( +# pmid=None, +# title="Protein complex prediction with AlphaFold-Multimer.", +# journal="bioRxiv", +# volume=None, +# page_range=None, +# year=2021, +# authors=[ +# "Evans, R.", +# "O'Neill, M.", +# "Pritzel, A.", +# "Antropova, N.", +# "Senior, A.", +# "Green, T.", +# "Zidek, A.", +# "Bates, R.", +# "Blackwell, S.", +# "Yim, J.", +# "Ronneberger, O.", +# "Bodenstein, S.", +# "Zielinski, M.", +# "Bridgland, A.", +# "Potapenko, A.", +# "Cowie, A.", +# "Tunyasuvunakool, K.", +# "Jain, R.", +# "Clancy, E.", +# "Kohli, P.", +# "Jumper, J.", +# "Hassabis, D.", +# ], +# doi="10.1101/2021.10.04.463034", +# ) + + +# def _get_af2_software(version=None, is_multimer=False): +# """Get AF2 as dictionary, suitable to create a modelcif software ...""" +# if is_multimer: +# return { +# "name": "AlphaFold-Multimer", +# "classification": "model building", +# "description": "Structure prediction", +# "citation": ihm.citations.af2_multimer, +# "location": "https://github.com/deepmind/alphafold", +# "type": "package", +# "version": version, +# } +# else: +# return { +# "name": "AlphaFold", +# "classification": "model building", +# "description": "Structure prediction", +# "citation": ihm.citations.alphafold2, +# "location": "https://github.com/deepmind/alphafold", +# "type": "package", +# "version": version, +# } + + +# def _get_dssp_software(version=None): +# """Get DSSP SW as dictionary, suitable to create a modelcif software +# object.""" +# # NOTE: this is using the old repo and old citation (ok for v2.3) +# return { +# "name": "DSSP", +# "classification": "data collection", +# "description": "Secondary structure assignment", +# "citation": ihm.Citation( +# pmid="6667333", +# title="Dictionary of protein secondary structure: pattern " +# + "recognition of hydrogen-bonded and geometrical features.", +# journal="Biopolymers", +# volume=22, +# page_range=(2577, 2637), +# year=1983, +# authors=["Kabsch, W.", "Sander, C."], +# doi="10.1002/bip.360221211", +# ), +# "location": "https://github.com/cmbi/dssp", +# "type": "program", +# "version": version, +# } + + +# def _get_foldseek_software(version=None): +# """Get Foldseek SW as dictionary, suitable to create a modelcif software +# object.""" +# return { +# "name": "Foldseek", +# "classification": "data collection", +# "description": "Protein structure search", +# "citation": ihm.Citation( +# pmid="37156916", +# title="Fast and accurate protein structure search with Foldseek.", +# journal="Nat Biotechnol", +# volume=42, +# page_range=(243, 246), +# year=2024, +# authors=[ +# "van Kempen, M.", +# "Kim, S.S.", +# "Tumescheit, C.", +# "Mirdita, M.", +# "Lee, J.", +# "Gilchrist, C.L.M.", +# "Soeding, J.", +# "Steinegger, M.", +# ], +# doi="10.1038/s41587-023-01773-0", +# ), +# "location": "https://search.foldseek.com/search", +# "type": "package", +# "version": version, +# } + + +# def _get_sam_cc_software(version=None): +# """Get SamCC-Turbo SW as dictionary, suitable to create a modelcif ... +# object.""" +# return { +# "name": "SamCC-Turbo", +# "classification": "data collection", +# "description": "Detection and measurement of coiled coils", +# "citation": ihm.Citation( +# pmid="33325494", +# title="A library of coiled-coil domains: from regular bundles to " +# + "peculiar twists.", +# journal="Bioinformatics", +# volume=36, +# page_range=(5368, 5376), +# year=2020, +# authors=[ +# "Szczepaniak, K.", +# "Bukala, A.", +# "da Silva Neto, A.M.", +# "Ludwiczak, J.", +# "Dunin-Horkawicz, S.", +# ], +# doi="10.1093/bioinformatics/btaa1041", +# ), +# "location": "https://github.com/labstructbioinf/samcc_turbo", +# "type": "package", +# "version": version, +# } + + +# def _get_us_align_software(version=None): +# """Get US-align SW as dictionary, suitable to create a modelcif software +# object.""" +# return { +# "name": "US-align", +# "classification": "data collection", +# "description": "Universal Structural alignment of macromolecules", +# "citation": ihm.Citation( +# pmid="36038728", +# title="US-align: universal structure alignments of proteins, " +# + "nucleic acids, and macromolecular complexes.", +# journal="Nat Methods", +# volume=19, +# page_range=(1109, 1115), +# year=2022, +# authors=["Zhang, C.", "Shine, M.", "Pyle, A.M.", "Zhang, Y."], +# doi="10.1038/s41592-022-01585-1", +# ), +# "location": "https://zhanggroup.org/US-align/", +# "type": "package", +# "version": version, +# } + + +# def _get_af2_description(pred): +# """Get description text based on parameters used.""" +# if pred["version"] == "2.1.2": +# num_models_str = "producing 5 models with 3 recycles" +# elif pred["version"] == "2.3.1": +# num_models_str = ( +# "producing 10 models (2 random seeds per parameter " +# "set) with up to 20 recycles" +# ) +# else: +# raise RuntimeError(f"Unexpected AF2 version: '{pred['version']}'") +# return ( +# f"using AlphaFold v{pred['version']} {num_models_str} and " +# f"{pred['num_ensemble']} ensemble each, with AMBER relaxation, " +# f"using templates up to date {pred['tmplate_cutoff_date']}, " +# f"ranked by ipTM*0.8+pTM*0.2, starting from MSAs with reduced_dbs " +# f"setting" +# ) + + +# def _get_af2_merge_dict(pred): +# """Get N/C-terminal padding lengths for artificial coiled-coils.""" +# pred_name = os.path.splitext(pred["prediction_p"])[0] +# ps = pred_name.split("_") +# pred_merge = {"N": 0, "C": 0} +# if "mergeN" in ps: +# pred_merge["N"] = int(ps[ps.index("mergeN") + 1]) +# if "mergeC" in ps: +# pred_merge["C"] = int(ps[ps.index("mergeC") + 1]) +# return pred_merge + + +# def _get_af2_range_string(pred): +# """Get string repr. of modelled range incl. coiled-coil padding.""" +# range_str = f"{pred['prediction_start']}-{pred['prediction_end']}" +# pred_merge = _get_af2_merge_dict(pred) +# if pred_merge["N"] or pred_merge["C"]: +# if pred_merge["N"] and pred_merge["C"]: +# seq_str = "sequences" +# else: +# seq_str = "sequence" +# pad_strs = [] +# if pred_merge["N"]: +# pad_strs.append(f"{pred_merge['N']} N-terminal residues") +# if pred_merge["C"]: +# pad_strs.append(f"{pred_merge['C']} C-terminal residues") +# pad_str = ", ".join(pad_strs) +# range_str += ( +# f" padded with artificial coiled-coil {seq_str} " +# f"({pad_str}) to improve oligomerization" +# ) +# return range_str + + +# def _get_preds_and_parts(json_dict, mdl_type): +# """Get list of AF2 pred. and trucated parts in a consistent way. +# For domains, the final model is the truncated part and gets "model" +# as truncated_prediction_path. +# """ +# if mdl_type == "assembly": +# af2_preds = json_dict["AF_predictions"] +# trunc_parts = json_dict["assembly_parts"] +# elif mdl_type == "domain": +# af2_pred = json_dict["source_AF_prediction"] +# af2_preds = [af2_pred] +# trunc_parts = [ +# { +# "part_start": json_dict["domain_sequence_start"], +# "part_end": json_dict["domain_sequence_end"], +# "truncated_prediction_path": "model", +# "source_prediction": os.path.splitext( +# af2_pred["prediction_p"] +# )[0], +# } +# ] +# else: +# raise RuntimeError(f"Unexpected mdl_type: '{mdl_type}'") +# # sort them by range +# af2_preds.sort( +# key=lambda pred: ( +# int(pred["prediction_start"]), +# int(pred["prediction_end"]), +# ) +# ) +# trunc_parts.sort(key=lambda part: (part["part_start"], part["part_end"])) +# return af2_preds, trunc_parts + + +# def _get_af2_files(af2_path): +# """Return names for PDB file, PKL file, and PAE file.""" +# af_base = os.path.splitext(af2_path)[0] +# return af2_path, f"{af_base}_info.pkl", f"PAE_{af_base}.png" + + +# def _get_protocol_steps_and_software(json_dict, mdl_type, mdl_id): +# """Create the list of protocol steps with software and parameters used.""" +# protocol = [] + +# # LOGIC: +# # - First steps are AF2 predictions with different settings +# # -> each setting has a number of ranges +# # -> if only one AF pred., the text gets simplified +# # - Next step: models truncated (and superposed) into parts +# # -> one step for all ranges +# # -> no superpos. if only one part +# # - For assemblies with more than one part: +# # -> MODELLER step +# # -> QE step combining pLDDT and MODELLER energy score +# # Note: sanity checks all done in _do_sanity_checks and not here! + +# # collect info on predictions and parts +# af2_preds, trunc_parts = _get_preds_and_parts(json_dict, mdl_type) + +# # group AF2 predictions by SW config +# af2_config_preds = {} # dict = version, tmplate_cutoff_date, num_ensemble +# for pred in af2_preds: +# config_tuple = ( +# pred["version"], +# pred["tmplate_cutoff_date"], +# pred["num_ensemble"], +# ) +# if config_tuple not in af2_config_preds: +# af2_config_preds[config_tuple] = [pred] +# else: +# af2_config_preds[config_tuple].append(pred) + +# # create AF2 steps +# for config_tuple in sorted(af2_config_preds): +# version, max_template_date, num_ensemble = config_tuple +# sw_params = { +# "model_preset": "multimer", +# "db_preset": "reduced_dbs", +# "max_template_date": max_template_date, +# "num_ensemble": num_ensemble, +# } +# if version == "2.3.1": +# sw_params["num_multimer_predictions_per_model"] = 2 +# sw_plus_params = [ +# (_get_af2_software(version=version, is_multimer=True), sw_params) +# ] +# # build up text +# cur_preds = af2_config_preds[config_tuple] +# ranges_txt = ", ".join( +# [_get_af2_range_string(pred) for pred in cur_preds] +# ) +# af_txt = _get_af2_description(cur_preds[0]) # all cur_preds the same +# # shortened text for single pred. +# if len(af2_preds) == 1: +# details = ( +# f"AlphaFold-Multimer prediction {af_txt} for range " +# f"{ranges_txt}." +# ) +# else: +# details = ( +# f"AlphaFold-Multimer prediction of overlapping, " +# f"partial sequences {af_txt}. The following residue " +# f"ranges were predicted: {ranges_txt}." +# ) +# # collect output files +# output = [] +# for pred in cur_preds: +# output.extend(_get_af2_files(pred["prediction_p"])) +# protocol.append( +# { +# "method_type": "modeling", +# "name": "AF2", +# "details": details, +# "input": [ +# f"target_sequences_{mdl_id}", +# f"ref_dbs_AF2_{version}", +# ], +# "output": output, +# "software_plus_params": sw_plus_params, +# } +# ) + +# # create truncation step +# ranges_txt = ", ".join( +# [f"{part['part_start']}-{part['part_end']}" for part in trunc_parts] +# ) +# if len(trunc_parts) == 1: +# details = f"Model truncated to predicted domain range {ranges_txt}." +# else: +# details = ( +# f"Models truncated and superposed based on the following " +# f"overlapping residue ranges: {ranges_txt}." +# ) +# trunc_data = [part["truncated_prediction_path"] for part in trunc_parts] +# # add MDL ID suffix if needed +# trunc_data = [ +# (f"model_{mdl_id}" if td == "model" else td) for td in trunc_data +# ] +# protocol.append( +# { +# "method_type": "other", +# "name": "Truncation", +# "details": details, +# "input": sorted( +# set( +# [ +# f"{part['source_prediction']}.pdb" +# for part in trunc_parts +# ] +# ) +# ), +# "output": trunc_data, +# "software_plus_params": [], +# } +# ) + +# # create other steps to combine truncated parts +# if len(trunc_parts) > 1: +# # MODELLER step +# protocol.append( +# { +# "method_type": "modeling", +# "name": "MODELLER", +# "details": "Assembly of the truncated models using MODELLER " +# 'executed using the "AutoModel" configuration.', +# "input": trunc_data, +# "output": [f"model_{mdl_id}"], +# "software_plus_params": [(_get_modeller_software("10.4"), +# {})], +# } +# ) +# # QE step +# protocol.append( +# { +# "method_type": "other", +# "name": "QE", +# "details": "Calculation of combined per-residue confidence " +# "metric (range 0-100, higher is better) based on " +# "per-residue AlphaFold confidence score (pLDDT) " +# "(range 0-100, higher is better, for residues with " +# "overlapping predictions, the higher pLDDT score is " +# "used) and per-residue MODELLER energy score (>0, " +# "lower is better) using the following formula: " +# "confidence = clip(0,100, pLDDT - 60 / (1 + exp((150 " +# "- energy)/20))). This score reduces the pLDDT score " +# "by max. 60 (for energy values >> 150). For energy " +# "values < 100, the score approximately follows the " +# "pLDDT score.", +# "input": [f"model_{mdl_id}"], +# "output": [f"model_{mdl_id}"], +# "software_plus_params": [], +# } +# ) +# if "software_used_for_domain_annotation" in json_dict: +# # domain annotation step +# protocol.append( +# { +# "method_type": "other", +# "name": "domain annotation", +# "details": "..." +# "the following tools: DSSP, SamCC-Turbo, Foldseek, " +# "US-align.", +# "input": [f"model_{mdl_id}"], +# "output": [f"model_{mdl_id}"], +# "software_plus_params": [ +# (_get_dssp_software("2.3.0"), {}), +# (_get_sam_cc_software(), {}), +# (_get_foldseek_software(), {}), +# (_get_us_align_software(), {}), +# ], +# } +# ) + +# return protocol + + +# def _process_ent(ent): +# """Helper to process OST entities for sanity checks. +# Returns: +# - ch_names: list of chain names in order as appearing in file +# - mdlsqe: atomseq (no gaps) for model +# -> incl. assertion that all chains have same seq. +# - resnum_range: (min. res. num., max. res. num.) +# -> incl. assertion that res. num. are continuous and w/o gaps +# - ent_bfs: b-factor for each residue +# -> incl. assertion that all atoms of res. have same +# -> list of length len(ch_names) * len(mdlsqe) +# """ +# # chain names in order +# ch_names = [ch.name for ch in ent.chains] +# # sequence +# unique_atomseqs = sorted( +# set( +# [ +# "".join(res.one_letter_code for res in ch.residues) +# for ch in ent.chains +# ] +# ) +# ) +# assert len(unique_atomseqs) == 1 +# mdlsqe = unique_atomseqs[0] +# # res. nums (required to be continuous) +# resnum_ranges = [] +# for ch in ent.chains: +# res_nums = [res.number.num for res in ch.residues] +# assert res_nums == list(range(min(res_nums), max(res_nums) + 1)) +# resnum_ranges.append((min(res_nums), max(res_nums))) +# unique_resnum_ranges = sorted(set(resnum_ranges)) +# assert len(unique_resnum_ranges) == 1 +# resnum_range = unique_resnum_ranges[0] +# # b-factors +# ent_bfs = [] +# for res in ent.residues: +# b_factors = [a.b_factor for a in res.atoms] +# assert len(set(b_factors)) == 1 # must all be equal! +# ent_bfs.append(b_factors[0]) +# assert len(ent_bfs) == len(ch_names) * len(mdlsqe) +# return ch_names, mdlsqe, resnum_range, ent_bfs + + +# def _load_cif_file(zip_dict): +# """Read CIF file for given entry. +# Returns OST entity and dictionary of categories to put back into resulting +# ModelCIF file with gemmi (categories not supported by python-modelcif). +# """ +# # fetch file and fix for OST +# cif_str = zip_dict["cif_file"] +# doc = gemmi.cif.read_string(cif_str) +# block = doc.sole_block() +# cat_dict = block.get_mmcif_category("_struct_ref.") +# if cat_dict and "db_code" not in cat_dict: +# if "pdbx_db_accession" in cat_dict: +# cat_dict["db_code"] = cat_dict["pdbx_db_accession"] +# else: +# lengths = [len(v) for v in cat_dict.values()] +# assert len(set(lengths)) == 1 +# cat_dict["db_code"] = [""] * lengths[0] +# block.set_mmcif_category("_struct_ref.", cat_dict) +# # kept mmCIF data: to check consistency later +# kept_cif_cats = {} +# for cat in [ +# "_pdbx_domain.", +# "_pdbx_feature_domain.", +# "_pdbx_domain_range.", +# ]: +# cat_dict = block.get_mmcif_category(cat) +# if cat_dict: +# kept_cif_cats[cat] = cat_dict +# # fix data +# if len(kept_cif_cats) == 3: +# # fix IDs if possible +# pd_ids = kept_cif_cats["_pdbx_domain."]["id"] +# pfd_ids = kept_cif_cats["_pdbx_feature_domain."]["id"] +# pfd_dom_ids = kept_cif_cats["_pdbx_feature_domain."]["domain_id"] +# pdr_dom_ids = kept_cif_cats["_pdbx_domain_range."]["domain_id"] +# if ( +# pdr_dom_ids != pd_ids +# and pfd_ids == pd_ids +# and pfd_dom_ids == pdr_dom_ids +# ): +# kept_cif_cats["_pdbx_feature_domain."]["domain_id"] = pd_ids +# kept_cif_cats["_pdbx_domain_range."]["domain_id"] = pd_ids +# # fix characters in expected places +# exp_cat_items = [ +# "_pdbx_domain.details", +# "_pdbx_feature_domain.feature", +# ] +# for cat, cat_dict in kept_cif_cats.items(): +# for item in cat_dict.keys(): +# if f"{cat}{item}" in exp_cat_items: +# cat_dict[item] = [ +# value.replace("°-", " degree ") +# .replace("°", " degree") +# .replace("αβ", "alpha/beta") +# .replace("α", "alpha") +# .replace("β", "beta") +# .replace("“", '"') +# .replace("→", " to ") +# for value in cat_dict[item] +# ] +# # get OST ent. +# cif_ent, info, ss = io.MMCifStrToEntity( +# doc.as_string(), profile=io.profiles["DEFAULT"], process=True +# ) +# return cif_ent, kept_cif_cats + + +# def _do_sanity_checks(mdl_id, json_dict, zip_dict, cif_ent, kept_cif_cats): +# """Check if everything in order and return issues for weird cases.""" +# issues = [] + +# # get some general info depending on model type +# mdl_type = zip_dict["mdl_type"] +# plddt_len = len(json_dict["pLDDT"]) +# if mdl_type == "assembly": +# mdl_seq = json_dict["predicted_sequence"] +# seq_range = ( +# json_dict["predicted_sequence_start"], +# json_dict["predicted_sequence_end"], +# ) +# ent_desc = json_dict["source_sequence_description"] +# assert "software_used_for_domain_annotation" in json_dict +# assert len(json_dict["per_res_conf"]) == plddt_len +# else: +# assert "predicted_sequence" not in json_dict +# mdl_seq = json_dict["domain_sequence"] +# seq_range = ( +# json_dict["domain_sequence_start"], +# json_dict["domain_sequence_end"], +# ) +# label = f'domain/motif "{json_dict["domain_name"]}"' +# assert label in json_dict["abstract"] +# ent_desc = f'{json_dict["source_sequence_description"]} ({label})' +# assert "merge" not in \ +# json_dict["source_AF_prediction"]["prediction_p"] +# assert "software_used_for_domain_annotation" not in json_dict +# assert "per_res_conf" not in json_dict + +# # global checks +# assert sum((c in "XOUBJZ") for c in mdl_seq) == 0 +# src_seq = json_dict["source_sequence"] +# assert mdl_seq == src_seq[seq_range[0] - 1 : seq_range[1]] +# if len(mdl_seq) != plddt_len: +# short_data = (len(mdl_seq), plddt_len) +# long_data = (mdl_seq, json_dict["pLDDT"]) +# issues.append(("plddt_size_mismatch", short_data, long_data)) +# assert seq_range[1] > seq_range[0] +# if seq_range[1] > len(src_seq): +# short_data = (seq_range, len(src_seq)) +# issues.append(("range_source_mismatch", short_data, [])) +# for key in ["percent_confident_residues", "mean_per_res_conf"]: +# assert key in json_dict +# extra_keys = set(json_dict.keys()) - set( +# [ +# "AF_predictions", +# "MODELLER_energy_score", +# "abstract", +# "assembly_parts", +# "domain_name", +# "domain_sequence", +# "domain_sequence_end", +# "domain_sequence_start", +# "mean_per_res_conf", +# "pLDDT", +# "per_res_conf", +# "percent_confident_residues", +# "predicted_sequence", +# "predicted_sequence_end", +# "predicted_sequence_start", +# "software_used_for_domain_annotation", +# "software_used_for_prediction", +# "source_AF_prediction", +# "source_sequence", +# "source_sequence_RefSeq_ID", +# "source_sequence_description", +# "title", +# "source_sequence_download_date", +# "domain_id", +# "coiled_coil_annot", +# ] +# ) +# if len(extra_keys) > 0: +# issues.append(("extra_keys", sorted(extra_keys), [])) +# # unused/unknown coiled_coil_annot (expect to be None if there) +# assert json_dict.get("coiled_coil_annot") is None + +# # for valid mmCIF... +# mmcif_regex_ent = "[][ \t_(),.;:\"&<>/\\\\{}'`~!@#$%?+=*A-Za-z0-9|^-]*" +# mmcif_regex_desc = "[][ \n\t()_,.;:\"&<>/\\\\{}'`~!@#$%?+=*A-Za-z0-9|^-]*" +# assert bool(re.fullmatch(mmcif_regex_ent, ent_desc)) +# assert bool(re.fullmatch(mmcif_regex_desc, json_dict["title"])) +# assert bool(re.fullmatch(mmcif_regex_desc, json_dict["abstract"])) + +# # collect info on predictions and parts +# af2_preds, trunc_parts = _get_preds_and_parts(json_dict, mdl_type) + +# # check AF pred. +# exp_pred_keys = set( +# [ +# "prediction_start", +# "prediction_end", +# "prediction_p", +# "version", +# "tmplate_cutoff_date", +# "num_ensemble", +# "pTM", +# "ipTM", +# ] +# ) +# opt_pred_keys = set(["merged_GCN4linker_len_N", +# "merged_GCN4linker_len_C"]) +# pred_ranges = {} +# sw_kws_to_check = set() +# for pred in af2_preds: +# # check keys +# extra_keys = set(pred.keys()) - exp_pred_keys - opt_pred_keys +# assert len(extra_keys) == 0 +# noncovered_keys = exp_pred_keys - pred.keys() +# assert len(noncovered_keys) == 0 +# assert pred["version"] in pred["prediction_p"] +# # keep track of files and ranges for checks of parts +# pred_name = os.path.splitext(pred["prediction_p"])[0] +# assert pred_name not in pred_ranges +# pred_range = ( +# int(pred["prediction_start"]), +# int(pred["prediction_end"]), +# ) +# pred_ranges[pred_name] = pred_range +# # keep track of AF2 versions +# sw_kws_to_check.add(("AlphaFold", pred["version"])) +# # check merge stuff +# pred_merge = _get_af2_merge_dict(pred) +# dict_pred_merge = {"N": 0, "C": 0} +# if "merged_GCN4linker_len_N" in pred.keys(): +# dict_pred_merge["N"] = pred["merged_GCN4linker_len_N"] +# if "merged_GCN4linker_len_C" in pred.keys(): +# dict_pred_merge["C"] = pred["merged_GCN4linker_len_C"] +# assert pred_merge == dict_pred_merge +# # check acc. file content +# pdb_file, pkl_file, pae_file = _get_af2_files(pred["prediction_p"]) +# pdb_ent = io.PDBStrToEntity( +# zip_dict["accompanying_data"][pdb_file], +# profile=io.profiles["DEFAULT"], +# process=True, +# ) +# pkl_data = pickle.loads(zip_dict["accompanying_data"][pkl_file]) +# (pdb_ch_names, pdb_mdlsqe, pdb_resnum_range, pdb_ent_bfs) = ( +# _process_ent(pdb_ent) +# ) +# # pdb_ch_names can have random order and we don't care... +# # we ignore pdb_resnum_range and just check seq. +# exp_mdlsqe = src_seq[pred_range[0] - 1 : pred_range[1]] +# cut_end = len(pdb_mdlsqe) - pred_merge["C"] +# pdb_mdlsqe_cut = pdb_mdlsqe[pred_merge["N"] : cut_end] +# assert pdb_mdlsqe_cut == exp_mdlsqe +# # check QE +# exp_qe_len = len(pdb_ch_names) * len(pdb_mdlsqe) +# assert len(pkl_data["pLDDT"]) == exp_qe_len +# assert len(pkl_data["PAE"]) == exp_qe_len +# assert len(pkl_data["PAE"][0]) == exp_qe_len +# assert "pTM" in pkl_data +# assert "ipTM" in pkl_data +# qe_max_diff = np.max(abs(np.asarray(pkl_data["pLDDT"]) - pdb_ent_bfs)) +# if qe_max_diff > 0.01: +# # 2nd option: chain names in PDB were reordered compared to pkl +# bfs_arr = np.asarray(pdb_ent_bfs).reshape(len(pdb_ch_names), -1) +# ch_indices = [pdb_ch_names.index(ch) for ch in ["A", "C", "B"]] +# bfs_new = bfs_arr[ch_indices, :].flatten() +# qe_max_diff = np.max(abs(np.asarray(pkl_data["pLDDT"]) - bfs_new)) +# if qe_max_diff > 0.01: +# short_data = [qe_max_diff, pkl_file] +# long_data = [list(pkl_data["pLDDT"]), pdb_ent_bfs] +# issues.append(("pkl_plddt_diff", short_data, long_data)) +# # check redundant data +# assert str(pred["prediction_start"]) == pkl_data["pred_start"] +# assert str(pred["prediction_end"]) == pkl_data["pred_end"] +# assert pred["tmplate_cutoff_date"] == pkl_data["tmpl_max_date"] +# assert pred["version"] == pkl_data["version"] +# assert pred["num_ensemble"] == pkl_data["num_ensemble"] +# if pred["pTM"] != pkl_data["pTM"]: +# long_data = [pred["pTM"], float(pkl_data["pTM"])] +# short_data = [abs(long_data[0] - long_data[1]), pkl_file] +# issues.append(("pkl_ptm_diff", short_data, long_data)) +# if pred["ipTM"] != pkl_data["ipTM"]: +# long_data = [pred["ipTM"], float(pkl_data["ipTM"])] +# short_data = [abs(long_data[0] - long_data[1]), pkl_file] +# issues.append(("pkl_iptm_diff", short_data, long_data)) +# if "merged_GCN4linker_len_N" in pred: +# if "merge_len_N" in pkl_data.keys(): +# other_len = pkl_data["merge_len_N"] +# else: +# # HACK for 0898 +# assert "merge_len" in pkl_data.keys() +# other_len = pkl_data["merge_len"] +# assert pred["merged_GCN4linker_len_N"] == other_len +# if "merged_GCN4linker_len_C" in pred: +# assert pred["merged_GCN4linker_len_C"] == pkl_data["merge_len_C"] +# # check expected fixed data +# assert pkl_data["max_msa_size"] == 10000 +# assert pkl_data["db_preset"] == "reduced_dbs" +# assert pkl_data["amber_relax"] == True + +# # check truncated parts +# if len(trunc_parts) > 1: +# assert len(json_dict["MODELLER_energy_score"]) == plddt_len +# sw_kws_to_check.add(("MODELLER", "10.4")) +# else: +# assert "MODELLER_energy_score" not in json_dict +# exp_part_keys = sorted( +# [ +# "part_start", +# "part_end", +# "truncated_prediction_path", +# "source_prediction", +# ] +# ) +# part_names = [] +# source_predictions = [] +# for part in trunc_parts: +# # check metadata +# assert sorted(part.keys()) == exp_part_keys +# source_predictions.append(part["source_prediction"]) +# part_names.append(part["truncated_prediction_path"]) +# pred_range = pred_ranges[part["source_prediction"]] +# assert part["part_start"] >= pred_range[0] +# assert part["part_end"] <= pred_range[1] +# # check acc. file content +# part_path = part["truncated_prediction_path"] +# if part_path != "model": +# pdb_ent = io.PDBStrToEntity( +# zip_dict["accompanying_data"][part_path], +# profile=io.profiles["DEFAULT"], +# process=True, +# ) +# (pdb_ch_names, pdb_mdlsqe, pdb_resnum_range, pdb_ent_bfs) = ( +# _process_ent(pdb_ent) +# ) +# part_range = (part["part_start"], part["part_end"]) +# assert pdb_resnum_range == part_range # not really important +# exp_mdlsqe = src_seq[part_range[0] - 1 : part_range[1]] +# assert exp_mdlsqe == pdb_mdlsqe + +# # check CIF file +# (cif_ch_names, cif_mdlsqe, cif_resnum_range, cif_ent_bfs) = _process_ent( +# cif_ent +# ) +# assert seq_range == cif_resnum_range # NOTE: critical for kept_cif_cats +# assert cif_mdlsqe == mdl_seq +# if cif_ch_names != ["A", "B", "C"]: +# issues.append(("cif_ch_names", cif_ch_names, [])) +# # check b-factors (assume to match average or first chain) +# if mdl_id == "ma-taas-2867": +# # known to have faulty b-factors in CIF file; should use ones from PDB +# bfs_arr = np.asarray(pdb_ent_bfs).reshape(len(cif_ch_names), -1) +# bfs_avg = bfs_arr[0, :] +# else: +# bfs_arr = np.asarray(cif_ent_bfs).reshape(len(cif_ch_names), -1) +# bfs_avg = bfs_arr.mean(axis=0) +# assert len(bfs_avg) == len(cif_mdlsqe) +# if "per_res_conf" in json_dict: +# # for assemblies +# jd_sc = json_dict["per_res_conf"] +# max_diff = np.max(abs(bfs_avg - jd_sc)) +# if max_diff >= 0.01: +# long_data = [bfs_avg.tolist(), jd_sc] +# issues.append(("per_res_conf", max_diff, long_data)) +# else: +# # for domains +# jd_sc = json_dict["pLDDT"] +# max_diff = np.max(abs(bfs_arr[0, :] - jd_sc)) +# # b-factors known to be faulty for some models... +# if max_diff >= 0.01 and mdl_id[-4:] not in ["9293", "9344"]: +# long_data = [bfs_arr[0, :].tolist(), jd_sc] +# issues.append(("cif_b_factors", max_diff, long_data)) + +# # make sure prediction covers everything +# min_start = min(part["part_start"] for part in trunc_parts) +# max_end = max(part["part_end"] for part in trunc_parts) +# if min_start != seq_range[0] or max_end != seq_range[1]: +# short_data = [(min_start, max_end), seq_range] +# issues.append(("pred_range_mismatch", short_data, [])) +# assert len(set(part_names)) == len(trunc_parts) +# assert sorted(set(source_predictions)) == sorted(pred_ranges) + +# # all files there? +# exp_files = [] +# for pred in af2_preds: +# exp_files.extend(_get_af2_files(pred["prediction_p"])) +# for part in trunc_parts: +# if part["truncated_prediction_path"] != "model": +# exp_files.append(part["truncated_prediction_path"]) +# acc_files = sorted(zip_dict["accompanying_data"].keys()) +# assert len(set(exp_files) - set(acc_files)) == 0 +# extra_files = set(acc_files) - set(exp_files) +# if len(extra_files) > 0: +# long_data = [sorted(exp_files), acc_files] +# issues.append(("extra_acc_files", sorted(extra_files), long_data)) + +# # check SW +# sw_checked = set() +# claimed_sw = json_dict["software_used_for_prediction"] +# for kw_to_check in sw_kws_to_check: +# matching_sw = [ +# sw for sw in claimed_sw if all(kw in sw for kw in kw_to_check) +# ] +# assert len(matching_sw) == 1 +# sw_checked.add(matching_sw[0]) +# assert sorted(sw_checked) == sorted(claimed_sw) +# if "software_used_for_domain_annotation" in json_dict: +# claimed_sw = json_dict["software_used_for_domain_annotation"] +# exp_sw = [ +# "DSSP 2.3.0 (Kabsch and Sander 1983)", +# "Foldseek (van Kempen et al. 2024)", +# "SamCC-Turbo (Szczepaniak et al. 2020)", +# "US-align (Zhang et al. 2022)", +# ] +# assert sorted(claimed_sw) == exp_sw + +# # QE checks +# plddts = json_dict["pLDDT"] +# plddt_range = (min(plddts), max(plddts)) +# if plddt_range[0] < 0 or plddt_range[1] > 100: +# issues.append(("plddt_range_mismatch", plddt_range, plddts)) +# if "MODELLER_energy_score" in json_dict: +# energy = json_dict["MODELLER_energy_score"] +# if min(energy) <= 0: +# issues.append(("energy_range_mismatch", min(energy), energy)) +# if "per_res_conf" in json_dict: +# per_res_confs = json_dict["per_res_conf"] +# prc_range = (min(per_res_confs), max(per_res_confs)) +# if prc_range[0] < 0 or prc_range[1] > 100: +# issues.append(("prc_range_mismatch", prc_range, per_res_confs)) +# if ( +# "MODELLER_energy_score" not in json_dict +# and "per_res_conf" in json_dict +# ): +# v1 = np.asarray(plddts) +# v2 = np.asarray(per_res_confs) +# qe_max_diff = np.max(abs(v1 - v2)) +# if qe_max_diff > 0.05: +# long_data = [plddts, per_res_confs] +# issues.append(("prc_plddt_diff", qe_max_diff, long_data)) + +# # check domains +# if mdl_type == "assembly": +# exp_num_kept_cif_cats = 3 +# else: +# exp_num_kept_cif_cats = 0 +# if exp_num_kept_cif_cats != len(kept_cif_cats): +# short_data = (exp_num_kept_cif_cats, sorted(kept_cif_cats.keys())) +# issues.append(("num_kept_cif_cats", short_data, kept_cif_cats)) +# # check categories +# if len(kept_cif_cats) == 3: +# # the following should all match after fixes appled in _load_cif_file +# pd_ids = sorted(kept_cif_cats["_pdbx_domain."]["id"]) +# if len(set(pd_ids)) != len(pd_ids): +# issues.append(("non_unique_pd_ids", [], pd_ids)) +# pfd_ids = kept_cif_cats["_pdbx_feature_domain."]["id"] +# if len(set(pfd_ids)) != len(pfd_ids): +# issues.append(("non_unique_pfd_ids", [], pfd_ids)) +# pfd_dom_ids = sorted( +# kept_cif_cats["_pdbx_feature_domain."]["domain_id"] +# ) +# if pd_ids != pfd_dom_ids: +# issues.append(("pfd_dom_ids", [], [pd_ids, pfd_dom_ids])) +# pdr_dom_ids = sorted( +# kept_cif_cats["_pdbx_domain_range."]["domain_id"]) +# if pd_ids != pdr_dom_ids: +# issues.append(("pdr_dom_ids", [], [pd_ids, pdr_dom_ids])) +# # special characters? +# for cat, cat_dict in kept_cif_cats.items(): +# for item, values in cat_dict.items(): +# for value in values: +# # I use the most permissive one +# if not re.fullmatch(mmcif_regex_desc, str(value)): +# invalid_chars = re.sub( +# mmcif_regex_desc, "", str(value) +# ) +# issues.append( +# ( +# "cif_invalid_chars", +# (f"{cat}{item}", invalid_chars), +# value, +# ) +# ) +# # matches the range? +# cat_dict = kept_cif_cats["_pdbx_domain_range."] +# res_list = list( +# zip( +# cat_dict["beg_label_asym_id"], +# cat_dict["beg_label_comp_id"], +# cat_dict["beg_label_seq_id"], +# ) +# ) + list( +# zip( +# cat_dict["end_label_asym_id"], +# cat_dict["end_label_comp_id"], +# cat_dict["end_label_seq_id"], +# ) +# ) +# for ch_name, res_name, res_num in res_list: +# res = cif_ent.FindResidue(ch_name, int(res_num)) +# if not res.IsValid() or not res.name == res_name: +# issues.append( +# ("invalid_dom_range_res", [ch_name, res_name, res_num], +# []) +# ) + +# return issues + + +# def _get_entities(json_dict, ncbi_data, cif_ent, mdl_type): +# """Gather data for the mmCIF (target) entities. +# Returns (list of target entities, list of issues) +# """ +# issues = [] +# # get NCBI data +# ncbi_ac = json_dict["source_sequence_RefSeq_ID"] +# ncbi_item = ncbi_data[ncbi_ac] +# ncbi_info = ncbi_item["info"] +# assert ncbi_ac == ncbi_info["AccessionVersion"] +# ncbi_seq = ncbi_item["seq_str"] +# # get json_dict data +# if mdl_type == "assembly": +# ent_desc = json_dict["source_sequence_description"] +# # full seq. +# seqres = json_dict["source_sequence"] +# ncbi_range = (1, len(ncbi_seq)) +# mdl_range = ncbi_range +# else: +# label = f'domain/motif "{json_dict["domain_name"]}"' +# ent_desc = f'{json_dict["source_sequence_description"]} ({label})' +# # cut seq. +# seqres = json_dict["domain_sequence"] +# ncbi_range = ( +# json_dict["domain_sequence_start"], +# json_dict["domain_sequence_end"], +# ) +# mdl_range = (1, len(seqres)) +# auth_seq_id_map = { +# pdb_idx: ncbi_idx +# for pdb_idx, ncbi_idx in zip( +# range(mdl_range[0], mdl_range[1] + 1), +# range(ncbi_range[0], ncbi_range[1] + 1), +# ) +# } +# # compare with NCBI +# ncbi_seq_cut = ncbi_seq[ncbi_range[0] - 1 : ncbi_range[1]] +# if seqres != ncbi_seq_cut: +# # one case exists with a single mismatch +# assert len(seqres) == len(ncbi_seq_cut) +# mismatches = [ +# (idx + 1, c1, c2) +# for idx, (c1, c2) in enumerate(zip(seqres, ncbi_seq_cut)) +# if c1 != c2 +# ] +# issues.append(("mismatches", mismatches, [seqres, ncbi_seq_cut])) +# else: +# mismatches = [] +# # fill data +# cif_ch_names, cif_mdlsqe, _, _ = _process_ent(cif_ent) +# entity = { +# "seqres": seqres, +# "ncbi_seq": ncbi_seq, +# "sqe_no_gaps": cif_mdlsqe, +# "mismatches": mismatches, +# "ncbi_range": ncbi_range, +# "mdl_range": mdl_range, +# "auth_seq_id_map": auth_seq_id_map, +# "pdb_chain_ids": cif_ch_names, +# "description": ent_desc, +# "ncbi_acv": ncbi_info["AccessionVersion"], +# "ncbi_gi": str(ncbi_info["Gi"]), +# "ncbi_taxid": str(ncbi_info["TaxId"]), +# "ncbi_organism": ncbi_info["SpeciesName"], +# "ncbi_last_mod": datetime.datetime.strptime( +# ncbi_info["UpdateDate"], "%Y/%m/%d" +# ), +# } +# return [entity], issues + + +################################################################################ + + +################################################################################ +# ModelCIF HANDLING +################################################################################ +# pylint: disable=too-few-public-methods +# class _GlobalPCR( +# modelcif.qa_metric.Global, modelcif.qa_metric.NormalizedScore +# ): +# """Percentage of confident (score > 70) residues in [0,1]""" + +# name = "percent confident residues" +# software = None + + +# class _GlobalPRC(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT): +# """Average per-residue confidence (pLDDT-like) in [0,1]""" + +# name = "average per-residue confidence (pLDDT-like)" +# software = None + + +# class _GlobalPRC_pLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT): +# """Average per-residue confidence (pLDDT) in [0,1]""" + +# name = "average per-residue confidence (pLDDT)" +# software = None + + +# class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT): +# """Predicted accuracy according to the CA-only lDDT in [0,100]""" + +# name = "pLDDT" +# software = None + + +# class _LocalMOD(modelcif.qa_metric.Local, modelcif.qa_metric.Energy): +# """Per-residue MODELLER energy score (>0, lower is better).""" + +# name = "MODELLER energy score" +# software = None + + +# class _LocalPRC(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT): +# """Per-residue confidence score in [0,100].""" + +# name = "confidence score (pLDDT-like)" +# software = None + + +# class _NcbiTrgRef(modelcif.reference.TargetReference): +# """NCBI as target reference.""" + +# name = "NCBI" +# other_details = None + + +# class _LPeptideAlphabetWithXO(ihm.LPeptideAlphabet): +# """Have the default amino acid alphabet plus 'X' for unknown residues +# and 'O' as allowed non-def. AA (U already in alphabet).""" + +# # extra entry added according to LPeptideAlphabet def. in +# # https://python-ihm.readthedocs.io/en/latest/_modules/ihm.html +# # and https://files.rcsb.org/view/1NTH.cif for values for 'O'. + +# def __init__(self): +# """Create the alphabet.""" +# super().__init__() +# self._comps["X"] = self._comps["UNK"] +# self._comps["O"] = ihm.LPeptideChemComp( +# "PYL", "O", "O", "PYRROLYSINE", "C12 H21 N3 O3" +# ) + + +# # pylint: enable=too-few-public-methods + + +# # TEST: _get_not_modeled_residues from modelcif/util/make_mmcif.py +# def _get_not_modeled_residues(model): +# """Yield NotModeledResidueRange objects for all residue ranges in the +# Model that are not referenced by Atom, Sphere, or pre-existing +# NotModeledResidueRange objects""" +# for assem in model.assembly: +# asym = assem.asym if hasattr(assem, "asym") else assem +# if not asym.entity.is_polymeric(): +# continue +# # Make a set of all residue indices of this asym "handled" either +# # by being modeled (with Atom or Sphere objects) or by being +# # explicitly marked as not-modeled +# handled_residues = set() +# for rr in model.not_modeled_residue_ranges: +# if rr.asym_unit is asym: +# for seq_id in range(rr.seq_id_begin, rr.seq_id_end + 1): +# handled_residues.add(seq_id) +# for atom in model.get_atoms(): +# if atom.asym_unit is asym: +# handled_residues.add(atom.seq_id) +# # Convert set to a list of residue ranges +# handled_residues = ihm.util._make_range_from_list( +# sorted(handled_residues) +# ) +# # Return not-modeled for each non-handled range +# for r in ihm.util._invert_ranges( +# handled_residues, +# end=assem.seq_id_range[1], +# start=assem.seq_id_range[0], +# ): +# yield modelcif.model.NotModeledResidueRange(asym, r[0], r[1]) + + +# class _OST2ModelCIF(modelcif.model.AbInitioModel): +# """Map OST entity elements to ihm.model""" + +# def __init__(self, *args, **kwargs): +# """Initialise a model""" +# # process kwargs +# for i in ["ost_entity", "asym", "scores_json"]: +# if i not in kwargs: +# raise TypeError(f"Required keyword argument '{i}' not found.") +# self.ost_entity = kwargs.pop("ost_entity") +# self.asym = kwargs.pop("asym") +# self.scores_json = kwargs.pop("scores_json") + +# # call parent init (needs to pop new kwargs first!) +# super().__init__(*args, **kwargs) + +# # use auth IDs for res. nums and chain names +# self.use_auth = False + +# # fetch b-factors for all residues (for get_atoms) +# self.res_bfactors = { +# r.qualified_name: self.scores_json["b_factors"][i] +# for i, r in enumerate(self.ost_entity.residues) +# } + +# # need reverse mapping from chain name + res num to seq. id +# self.seq_id_map = { +# ch_id: {v: k for k, v in asym_unit.auth_seq_id_map.items()} +# for ch_id, asym_unit in self.asym.items() +# } + +# # explicitly add parts which were not modelled +# for ch in self.ost_entity.chains: +# # get covered range +# asym_unit = self.asym[_get_ch_name(ch, self.use_auth)] +# res_nums = [res.number.num for res in ch.residues] +# res_first = min(res_nums) +# res_last = max(res_nums) +# # assertion true in this set (no gaps in modelled res. nums) +# assert res_nums == list(range(res_first, res_last + 1)) +# # compare with ent seq. +# mdl_seq_first = self.seq_id_map[ch.name][res_first] +# mdl_seq_last = self.seq_id_map[ch.name][res_last] +# ent_seq_first, ent_seq_last = asym_unit.seq_id_range +# if mdl_seq_first != ent_seq_first: +# assert mdl_seq_first > ent_seq_first +# self.not_modeled_residue_ranges.append( +# modelcif.model.NotModeledResidueRange( +# asym_unit, +# ent_seq_first, +# mdl_seq_first - 1, +# ) +# ) +# # NOTE: the case below doesn't actually happen in this set... +# if mdl_seq_last != ent_seq_last: +# assert mdl_seq_last < ent_seq_last +# self.not_modeled_residue_ranges.append( +# modelcif.model.NotModeledResidueRange( +# asym_unit, +# mdl_seq_last + 1, +# ent_seq_last, +# ) +# ) +# # -> note: could be auto-filled as in modelcif/util/make_mmcif.py +# # (see commented _get_not_modeled_residues TEST here) +# # -> see https://github.com/ihmwg/python-modelcif/issues/37 + +# def get_atoms(self): +# # ToDo [internal]: Take B-factor out since its not a B-factor? +# # NOTE: this assumes that _get_res_num maps residue to pos. in seqres +# # within asym +# for atm in self.ost_entity.atoms: +# yield modelcif.model.Atom( +# asym_unit=self.asym[_get_ch_name(atm.chain, self.use_auth)], +# seq_id=self.seq_id_map[atm.chain.name][atm.residue.number.num], +# atom_id=atm.name, +# type_symbol=atm.element, +# x=atm.pos[0], +# y=atm.pos[1], +# z=atm.pos[2], +# het=atm.is_hetatom, +# biso=self.res_bfactors[atm.residue.qualified_name], +# occupancy=atm.occupancy, +# ) + +# def add_scores(self): +# """Add QA metrics from AF2 scores.""" +# # global scores +# for score, score_class in [ +# ("percent_confident_residues", _GlobalPCR), +# ("mean_per_res_conf", _GlobalPRC), +# ("mean_per_res_conf_plddt", _GlobalPRC_pLDDT), +# ]: +# if score in self.scores_json: +# self.qa_metrics.append(score_class(self.scores_json[score])) + +# # local scores +# for score, score_class in [ +# ("pLDDT", _LocalPLDDT), +# ("MODELLER_energy_score", _LocalMOD), +# ("per_res_conf", _LocalPRC), +# ]: +# if score in self.scores_json: +# i = 0 +# for chn in self.ost_entity.chains: +# for res in chn.residues: +# seq_id = self.seq_id_map[chn.name][res.number.num] +# asym = self.asym[chn.name].residue(seq_id) +# self.qa_metrics.append( +# score_class(asym, self.scores_json[score][i]) +# ) +# i += 1 + + +# def _get_modelcif_entities(target_ents, asym_units, system): +# """Create ModelCIF entities and asymmetric units.""" +# alphabet = _LPeptideAlphabetWithXO() +# for cif_ent in target_ents: +# # collect references +# ncbi_ref = _NcbiTrgRef( +# code=cif_ent["ncbi_acv"], +# accession=cif_ent["ncbi_gi"], +# ncbi_taxonomy_id=cif_ent["ncbi_taxid"], +# organism_scientific=cif_ent["ncbi_organism"], +# sequence_version_date=cif_ent["ncbi_last_mod"], +# sequence=cif_ent["ncbi_seq"], +# ) +# # add alignment incl. mismatches +# ncbi_ref.alignments.append( +# modelcif.reference.Alignment( +# db_begin=cif_ent["ncbi_range"][0], +# db_end=cif_ent["ncbi_range"][1], +# entity_begin=cif_ent["mdl_range"][0], +# entity_end=cif_ent["mdl_range"][1], +# seq_dif=[ +# ihm.reference.SeqDif( +# seq_id, alphabet[olc_db], alphabet[olc] +# ) +# for seq_id, olc, olc_db in cif_ent["mismatches"] +# ], +# ) +# ) +# # +# references = [ncbi_ref] +# # combine into ModelCIF entity +# mdlcif_ent = modelcif.Entity( +# cif_ent["seqres"], +# description=cif_ent["description"], +# alphabet=alphabet, +# source=ihm.source.Natural( +# ncbi_taxonomy_id=cif_ent["ncbi_taxid"], +# scientific_name=cif_ent["ncbi_organism"], +# ), +# references=references, +# ) +# # NOTE: this assigns (potentially new) alphabetic chain names +# for pdb_chain_id in cif_ent["pdb_chain_ids"]: +# asym_units[pdb_chain_id] = modelcif.AsymUnit( +# mdlcif_ent, +# strand_id=pdb_chain_id, +# auth_seq_id_map=cif_ent["auth_seq_id_map"], +# ) +# system.entities.append(mdlcif_ent) + + +# def _get_assoc_af2_pdb_data(pred): +# """Generate a modelcif.data.Data object for an AF2 prediction.""" +# return modelcif.data.Data( +# name=f"AlphaFold-Multimer (v{pred['version']}) model for range " +# + f"{_get_af2_range_string(pred)}; pTM {pred['pTM']}, " +# + f"ipTM {pred['ipTM']}", +# ) + + +# def _get_assoc_af2_json_data(af2_path): +# """Generate a modelcif.data.Data object for JSON file +# with extra QE for PDB file in af2_path. +# """ +# return modelcif.data.Data( +# name=f"Detailed quality estimates (pLDDT, PAE) for {af2_path}", +# ) + + +# def _get_assoc_pae_png_data(af2_path): +# """Generate a modelcif.data.Data object for PNG file +# with PAE plot for PDB file in af2_path. +# """ +# return modelcif.data.Data( +# name=f"Plot showing PAE matrix for {af2_path}", +# ) + + +# def _get_assoc_trunc_pdb_data(part, is_single): +# """Generate a modelcif.associated.File object pointing to PDB file +# for truncated model an AF2 prediction. +# """ +# if is_single: +# mdl_txt = "model" +# else: +# mdl_txt = "and superposed model" +# return modelcif.data.Data( +# name=f"Truncated {mdl_txt} for range {part['part_start']}-" +# + f"{part['part_end']} derived from AlphaFold-Multimer model " +# + f"{part['source_prediction']}.pdb", +# ) + + +# def _get_associated_file( +# fle_path, data, file_format="other", file_content="other" +# ): +# """Generate a modelcif.associated.File object for given data.""" +# afile = modelcif.associated.File( +# fle_path, +# details=data.name, +# data=data, +# ) +# afile.file_format = file_format +# afile.file_content = file_content +# return afile + + +# def _get_associated_files(mdl_name, arc_files): +# """Create entry for associated files.""" +# # package all into zip file +# return modelcif.associated.Repository( +# "", +# [modelcif.associated.ZipFile(f"{mdl_name}.zip", files=arc_files)], +# ) +# # NOTE: by convention MA expects zip file with same name as model-cif + + +# global_ref_dbs = {} + + +# def _get_ref_db_object(name, url, version=None, release_date=None): +# """Cached access to modelcif.ReferenceDatabase objects. +# Needed to remove duplicates in ModelCIF. +# """ +# key = (name, url, version, release_date) +# if key not in global_ref_dbs: +# global_ref_dbs[key] = modelcif.ReferenceDatabase( +# name, url, version, release_date +# ) +# return global_ref_dbs[key] + + +# def _get_sequence_dbs(config_data): +# """Get AF seq. DBs.""" +# # hard coded UniProt release (see https://www.uniprot.org/release-notes) +# # (TO BE UPDATED FOR EVERY DEPOSITION!) +# pdb_rel_date = config_data["pdb_rel_date"] +# up_version = config_data["up_version"] +# up_version_dates = { +# "2021_03": datetime.datetime(2021, 6, 2), +# "2022_05": datetime.datetime(2022, 12, 14), +# } +# up_rel_date = up_version_dates[up_version] +# # fill list of DBs +# seq_dbs = [] +# if config_data["use_small_bfd"]: +# seq_dbs.append( +# _get_ref_db_object( +# "Reduced BFD", +# "https://storage.googleapis.com/alphafold-databases/" +# + "reduced_dbs/bfd-first_non_consensus_sequences.fasta.gz", +# ) +# ) +# else: +# seq_dbs.append( +# _get_ref_db_object( +# "BFD", +# "https://storage.googleapis.com/alphafold-databases/" +# + "casp14_versions/" +# + "bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt.tar.gz", +# version="6a634dc6eb105c2e9b4cba7bbae93412", +# ) +# ) +# if config_data["af_version"] < "2.3.0": +# seq_dbs.append( +# _get_ref_db_object( +# "MGnify", +# "https://storage.googleapis.com/alphafold-databases/" +# + "casp14_versions/mgy_clusters_2018_12.fa.gz", +# version="2018_12", +# release_date=datetime.datetime(2018, 12, 6), +# ) +# ) +# seq_dbs.append( +# _get_ref_db_object( +# "Uniclust30", +# "https://storage.googleapis.com/alphafold-databases/" +# + "casp14_versions/uniclust30_2018_08_hhsuite.tar.gz", +# version="2018_08", +# release_date=None, +# ) +# ) +# else: +# # NOTE: release date according to https://ftp.ebi.ac.uk/pub/... +# seq_dbs.append( +# _get_ref_db_object( +# "MGnify", +# "https://storage.googleapis.com/alphafold-databases/" +# + "v2.3/mgy_clusters_2022_05.fa.gz", +# version="2022_05", +# release_date=datetime.datetime(2022, 5, 6), +# ) +# ) +# seq_dbs.append( +# _get_ref_db_object( +# "UniRef30", +# "https://storage.googleapis.com/alphafold-databases/" +# + "v2.3/UniRef30_2021_03.tar.gz", +# version="2021_03", +# release_date=None, +# ) +# ) +# if config_data["use_multimer"]: +# seq_dbs.append( +# _get_ref_db_object( +# "TrEMBL", +# "ftp://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/" +# + "knowledgebase/complete/uniprot_trembl.fasta.gz", +# version=up_version, +# release_date=up_rel_date, +# ) +# ) +# seq_dbs.append( +# _get_ref_db_object( +# "Swiss-Prot", +# "ftp://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/" +# + "knowledgebase/complete/uniprot_sprot.fasta.gz", +# version=up_version, +# release_date=up_rel_date, +# ) +# ) +# seq_dbs.append( +# _get_ref_db_object( +# "UniRef90", +# "ftp://ftp.uniprot.org/pub/databases/uniprot/uniref/uniref90/" +# + "uniref90.fasta.gz", +# version=up_version, +# release_date=up_rel_date, +# ) +# ) +# if config_data["use_templates"]: +# if config_data["use_multimer"]: +# # uses whatever is latest set of PDB sequences +# # see AF2 scripts/download_pdb_seqres.sh +# seq_dbs.append( +# _get_ref_db_object( +# "PDB seqres", +# "https://ftp.wwpdb.org/pub/pdb/derived_data/pdb_seqres.txt", +# release_date=pdb_rel_date, +# ) +# ) +# else: +# # fixed version used in AF2 scripts/download_pdb70.sh +# seq_dbs.append( +# _get_ref_db_object( +# "PDB70", +# "http://wwwuser.gwdg.de/~compbiol/data/hhsuite/databases/" +# + "hhsuite_dbs/old-releases/pdb70_from_mmcif_200401.tar.gz", +# release_date=datetime.datetime(2020, 4, 1), +# ) +# ) +# return seq_dbs + + +# def _assemble_modelcif_software(soft_dict, params_dict): +# """Create a modelcif.SoftwareWithParameters instance from dictionaries.""" +# # create SW object +# sw = modelcif.Software( +# soft_dict["name"], +# soft_dict["classification"], +# soft_dict["description"], +# soft_dict["location"], +# soft_dict["type"], +# soft_dict["version"], +# citation=soft_dict["citation"], +# ) +# # assemble parameters +# params = [] +# for key, val in params_dict.items(): +# params.append(modelcif.SoftwareParameter(key, val)) +# # put them together +# return modelcif.SoftwareWithParameters(sw, params) + + +# def _get_modelcif_protocol_software(js_step): +# """Assemble software entries for a ModelCIF protocol step.""" +# # new setup in python-modelcif (as of late 2023): params with each SW +# sw_list = [] +# for sw, sw_params in js_step["software_plus_params"]: +# sw_list.append(_assemble_modelcif_software(sw, sw_params)) +# # group and done... +# if sw_list: +# return modelcif.SoftwareGroup(sw_list) +# else: +# return None + + +# data_group_cache = {} + + +# def _get_modelcif_protocol_data(data_labels, target_entities, model, +# acc_data): +# """Assemble data for a ModelCIF protocol step. +# Cached access to objects needed to remove duplicates in ModelCIF. +# """ +# cache_key = "|".join(sorted(data_labels)) +# if cache_key in data_group_cache: +# return data_group_cache[cache_key] +# data = modelcif.data.DataGroup() +# for data_label in data_labels: +# if data_label.startswith("target_sequences_"): +# data.extend(target_entities) +# elif data_label == "ref_dbs_AF2_2.1.2": +# # HC for that version +# data.extend( +# _get_sequence_dbs( +# { +# "pdb_rel_date": datetime.datetime(2021, 11, 5), +# "up_version": "2021_03", +# "use_small_bfd": True, +# "af_version": "2.1.2", +# "use_multimer": True, +# "use_templates": True, +# } +# ) +# ) +# elif data_label == "ref_dbs_AF2_2.3.1": +# # HC for that version +# data.extend( +# _get_sequence_dbs( +# { +# "pdb_rel_date": datetime.datetime(2022, 12, 9), +# "up_version": "2022_05", +# "use_small_bfd": True, +# "af_version": "2.3.1", +# "use_multimer": True, +# "use_templates": True, +# } +# ) +# ) +# elif data_label.startswith("model_"): +# data.append(model) +# elif data_label in acc_data: +# data.append(acc_data[data_label]) +# else: +# raise RuntimeError(f"Unknown protocol data: '{data_label}'") +# data_group_cache[cache_key] = data +# return data + + +# def _get_modelcif_protocol(protocol_steps, target_entities, model, acc_data): +# """Create the protocol for the ModelCIF file.""" +# protocol = modelcif.protocol.Protocol() +# for js_step in protocol_steps: +# sftwre = _get_modelcif_protocol_software(js_step) +# input_data = _get_modelcif_protocol_data( +# js_step["input"], target_entities, model, acc_data +# ) +# output_data = _get_modelcif_protocol_data( +# js_step["output"], target_entities, model, acc_data +# ) + +# protocol.steps.append( +# modelcif.protocol.Step( +# input_data=input_data, +# output_data=output_data, +# name=js_step["name"], +# details=js_step["details"], +# software=sftwre, +# ) +# ) +# protocol.steps[-1].method_type = js_step["method_type"] +# return protocol + + +# def _compress_cif_file(cif_file): +# """Compress cif file and delete original.""" +# with open(cif_file, "rb") as f_in: +# with gzip.open(cif_file + ".gz", "wb") as f_out: +# shutil.copyfileobj(f_in, f_out) +# os.remove(cif_file) + + +# def _package_associated_files(repo, dict_data): +# """Compress associated files into single zip file and delete original. +# Data can either be on disk or available in dict_data (key = path). +# """ +# # zip settings tested for good speed vs compression +# for archive in repo.files: +# with zipfile.ZipFile(archive.path, "w", zipfile.ZIP_BZIP2) as cif_zip: +# for zfile in archive.files: +# zpath = zfile.path +# if zpath in dict_data: +# cif_zip.writestr(zpath, dict_data[zpath]) +# else: +# cif_zip.write(zpath, arcname=zpath) +# os.remove(zpath) + + +# def _pkl_to_json(pkl_data, pae_handling="keep"): +# """Make pkl_data JSON writable. +# Options for pae_handling: +# - keep = values kept as they are +# - round = values kept as closest integers +# - drop = drop whole table +# """ +# pkl_json_data = {} +# for k, v in pkl_data.items(): +# if k == "PAE": +# if pae_handling == "keep": +# pkl_json_data["PAE"] = pkl_data["PAE"].tolist() +# elif pae_handling == "round": +# pae = pkl_data["PAE"].round().astype(int).tolist() +# pkl_json_data["PAE"] = pae +# elif pae_handling == "drop": +# pass +# else: +# raise RuntimeError( +# f"Unknown pae_handling value {pae_handling}" +# ) +# elif k != "pLDDT_chains": +# if type(v) == np.ndarray: +# # note: scalars become scalars! +# v = v.tolist() +# pkl_json_data[k] = v +# return pkl_json_data + + +# def _get_sw_for_qe(steps, step_name): +# """Fetch suitable SW objects from protocol steps to use in QE.""" +# # to maximally reduce duplicates we reuse single groups +# # otherwise new group created using same SoftwareWithParameters objects +# sw_groups = [step.software for step in steps if step.name == step_name] +# if len(sw_groups) == 0: +# return None +# elif len(sw_groups) == 1: +# return sw_groups[0] +# else: +# # each sw_group is a list (SoftwareGroup) of SoftwareWithParameters +# # ...and we remove duplicates...just in case +# sw_dict = {} +# for sw_group in sw_groups: +# sw_dict.update({hash(swp): swp for swp in sw_group}) +# return modelcif.SoftwareGroup(sw_dict.values()) + + +# def _store_as_modelcif( +# data_json, +# ost_ent, +# zip_dict, +# kept_cif_cats, +# out_dir, +# mdl_name, +# compress, +# add_extra_files, +# ): +# """Mix all the data into a ModelCIF file.""" +# print(" generating ModelCIF objects...", end="") +# pstart = timer() +# # create system to gather all the data +# system = modelcif.System( +# title=data_json["title"], +# id=data_json["mdl_id"].upper(), +# model_details=data_json["model_details"], +# ) + +# # create an asymmetric unit and an entity per target sequence +# asym_units = {} +# _get_modelcif_entities(data_json["target_entities"], asym_units, system) + +# # audit_authors +# system.authors.extend(data_json["audit_authors"]) + +# # set up the model to produce coordinates +# model = _OST2ModelCIF( +# assembly=modelcif.Assembly(asym_units.values()), +# asym=asym_units, +# ost_entity=ost_ent, +# scores_json=data_json["scores"], +# name=None, +# ) +# # TEST +# # model.not_modeled_residue_ranges.extend( +# # _get_not_modeled_residues(model)) +# # +# print(f" ({timer()-pstart:.2f}s)") +# print(" processing QA scores...", end="", flush=True) +# pstart = timer() +# model.add_scores() +# print(f" ({timer()-pstart:.2f}s)") + +# model_group = modelcif.model.ModelGroup([model]) +# system.model_groups.append(model_group) + +# # handle additional files +# acc_data = {} +# acc_files = [] +# dict_data = {} +# for pred in data_json["af2_preds"]: +# # get data objects +# pdb_file, pkl_file, pae_file = _get_af2_files(pred["prediction_p"]) +# acc_data[pdb_file] = _get_assoc_af2_pdb_data(pred) +# # NOTE: we keep data object for mismatching PKL file even though we +# # do not dump file itself +# acc_data[pkl_file] = _get_assoc_af2_json_data(pdb_file) +# acc_data[pae_file] = _get_assoc_pae_png_data(pdb_file) +# if add_extra_files: +# # get file objects +# acc_files.append( +# _get_associated_file(pdb_file, acc_data[pdb_file]) +# ) +# if pkl_file not in data_json["pkl_files_to_skip"]: +# json_file = f"{os.path.splitext(pkl_file)[0]}.json" +# acc_files.append( +# _get_associated_file( +# json_file, acc_data[pkl_file], file_format="json" +# ) +# ) +# acc_files.append( +# _get_associated_file(pae_file, acc_data[pae_file]) +# ) +# # get file content +# dict_data[pdb_file] = zip_dict["accompanying_data"][pdb_file] +# if pkl_file not in data_json["pkl_files_to_skip"]: +# pkl_data = pickle.loads( +# zip_dict["accompanying_data"][pkl_file] +# ) +# pkl_json_data = _pkl_to_json(pkl_data, pae_handling="round") +# dict_data[json_file] = json.dumps(pkl_json_data) +# dict_data[pae_file] = zip_dict["accompanying_data"][pae_file] +# for part in data_json["trunc_parts"]: +# part_path = part["truncated_prediction_path"] +# if part_path != "model": +# acc_data[part_path] = _get_assoc_trunc_pdb_data( +# part, len(data_json["trunc_parts"]) == 1 +# ) +# if add_extra_files: +# acc_files.append( +# _get_associated_file(part_path, acc_data[part_path]) +# ) +# dict_data[part_path] = \ +# zip_dict["accompanying_data"][part_path] +# if acc_files: +# system.repositories.append(_get_associated_files(mdl_name, acc_files)) + +# # get data and steps +# protocol = _get_modelcif_protocol( +# data_json["protocol"], +# system.entities, +# model, +# acc_data, +# ) +# system.protocols.append(protocol) + +# # set SW for QE +# _LocalPLDDT.software = _get_sw_for_qe(protocol.steps, "AF2") +# _GlobalPRC_pLDDT.software = _LocalPLDDT.software +# _LocalMOD.software = _get_sw_for_qe(protocol.steps, "MODELLER") + +# # write modelcif System to file +# print(" write to disk...", end="", flush=True) +# pstart = timer() +# # NOTE: this will dump PAE on path provided in add_scores +# # -> hence we cheat by changing path and back while being ... +# oldpwd = os.getcwd() +# os.chdir(out_dir) +# mdl_fle = f"{mdl_name}.cif" +# try: +# # dump to string and fix with gemmi +# with StringIO() as mmcif_fh: +# modelcif.dumper.write(mmcif_fh, [system]) +# modelcif_str = mmcif_fh.getvalue() +# doc = gemmi.cif.read_string(modelcif_str) +# block = doc.sole_block() +# # HACK: set all label_alt_id to '-' +# # -> see https://github.com/wwpdb-dictionaries/mmcif_pdbx/issues/60 +# if kept_cif_cats: +# cat_dict = block.get_mmcif_category("_atom_site.") +# cat_dict["label_alt_id"] = ["-"] * len(cat_dict["label_alt_id"]) +# block.set_mmcif_category("_atom_site.", cat_dict) +# cat_dict = kept_cif_cats["_pdbx_domain_range."] +# new_alt_ids = ["-"] * len(cat_dict["beg_label_alt_id"]) +# cat_dict["beg_label_alt_id"] = new_alt_ids +# new_alt_ids = ["-"] * len(cat_dict["end_label_alt_id"]) +# cat_dict["end_label_alt_id"] = new_alt_ids +# # +# for cat, cat_dict in kept_cif_cats.items(): +# block.set_mmcif_category(cat, cat_dict) +# doc.write_file(mdl_fle) +# # extra files +# with open(f"{mdl_name}-image.png", "wb") as fh: +# fh.write(zip_dict["image_file"]) +# if acc_files: +# _package_associated_files(system.repositories[0], dict_data) +# if compress: +# _compress_cif_file(mdl_fle) +# mdl_fle += ".gz" +# finally: +# os.chdir(oldpwd) +# print(f" ({timer()-pstart:.2f}s)") + + +################################################################################ + + +################################################################################ +# HANDLE FULL DATA SET +################################################################################ +# def _translate2modelcif(ncbi_data, opts): +# """Convert a model with its accompanying data to ModelCIF.""" +# # get main names +# mdl_id = os.path.splitext(os.path.basename(opts.input_zip_path))[0] +# if opts.compress: +# cifext = "cif.gz" +# else: +# cifext = "cif" +# mdl_path = os.path.join(opts.out_dir, f"{mdl_id}.{cifext}") +# # skip if done already (disabled here) +# # if os.path.exists(mdl_path): +# # print(f" {mdl_id} already done...") + +# # prepare data for model to convert (also gets all issues) +# issues = [] +# abort_after_checks = opts.checks_only +# zip_dict = _get_zip_content(opts.input_zip_path) +# mdl_type = zip_dict["mdl_type"] +# if zip_dict["unparsed_files"]: +# file_list = zip_dict["unparsed_files"] +# issues.append(("unparsed_files", len(file_list), file_list)) +# json_dict = json.loads(zip_dict["json_file"]) +# # apply hard-coded fixes for some models +# if mdl_id == "ma-taas-2330": +# # find entry to fix +# for pred in json_dict["AF_predictions"]: +# if pred["prediction_p"] == "AFv2.3.1_389-515_mergeN_21.pdb": +# pred["prediction_p"] = "AFv2.3.1_389-515.pdb" +# del pred["merged_GCN4linker_len_N"] +# del pred["merged_GCN4linker_len_C"] +# elif mdl_id == "ma-taas-9424": +# # len(json_dict["pLDDT"]) was 53 instead of 52 +# # -> also in wrong abstract... +# json_dict["pLDDT"] = json_dict["pLDDT"][:52] +# json_dict["abstract"] = json_dict["abstract"].replace( +# "53 residues", "52 residues" +# ) +# cif_ent, kept_cif_cats = _load_cif_file(zip_dict) +# try: +# issues.extend( +# _do_sanity_checks( +# mdl_id, json_dict, zip_dict, cif_ent, kept_cif_cats +# ) +# ) +# entities, ent_issues = _get_entities( +# json_dict, ncbi_data, cif_ent, mdl_type +# ) +# issues.extend(ent_issues) +# except Exception as ex: +# short_txt = f"{type(ex).__name__}: {ex}" +# long_txt = traceback.format_exc() +# issues.append(("exception", short_txt, long_txt)) +# abort_after_checks = True + +# # dump issues +# issues_file_path = os.path.join(opts.out_dir, f"{mdl_id}-issues.json") +# json.dump(issues, open(issues_file_path, "w")) + +# # abort if needed +# if abort_after_checks: +# return + +# # do the actual conversion +# print(f" translating {mdl_id}...") +# pdb_start = timer() + +# # gather data into JSON-like structure +# print(" preparing data...", end="") +# pstart = timer() +# af2_preds, trunc_parts = _get_preds_and_parts(json_dict, mdl_type) +# mdlcf_json = { +# "title": json_dict["title"].strip(), +# "model_details": json_dict["abstract"].strip(), +# "audit_authors": _get_audit_authors(), +# "af2_preds": af2_preds, +# "trunc_parts": trunc_parts, +# "protocol": _get_protocol_steps_and_software( +# json_dict, mdl_type, mdl_id +# ), +# "mdl_id": mdl_id, # used for entry ID +# "target_entities": entities, +# "scores": { +# score: json_dict[score] +# for score in [ +# "pLDDT", +# "MODELLER_energy_score", +# "per_res_conf", +# "percent_confident_residues", +# "mean_per_res_conf", +# ] +# if score in json_dict +# }, +# "pkl_files_to_skip": [], +# } +# # fix percentage to [0,1] +# mdlcf_json["scores"]["percent_confident_residues"] /= 100.0 +# # enlarge local scores to be for each residue +# for score in ["pLDDT", "MODELLER_energy_score", "per_res_conf"]: +# if score in mdlcf_json["scores"]: +# mdlcf_json["scores"][score] *= cif_ent.chain_count +# # note: we overwrite b-factors from file! +# # -> also choose which global score to use for mean_per_res_conf +# if "per_res_conf" in mdlcf_json["scores"]: +# mdlcf_json["scores"]["b_factors"] = mdlcf_json["scores"][ +# "per_res_conf" +# ] +# else: +# mdlcf_json["scores"]["b_factors"] = mdlcf_json["scores"]["pLDDT"] +# mdlcf_json["scores"]["mean_per_res_conf_plddt"] = mdlcf_json[ +# "scores" +# ].pop("mean_per_res_conf") +# # check for inconsistent pkl files +# for issue_type, short_data, long_data in issues: +# if issue_type.startswith("pkl_") and issue_type.endswith("_diff"): +# mdlcf_json["pkl_files_to_skip"].append(short_data[1]) +# print(f" ({timer()-pstart:.2f}s)") + +# # save ModelCIF +# _store_as_modelcif( +# data_json=mdlcf_json, +# ost_ent=cif_ent, +# zip_dict=zip_dict, +# kept_cif_cats=kept_cif_cats, +# out_dir=opts.out_dir, +# mdl_name=mdl_id, +# compress=opts.compress, +# add_extra_files=(not opts.no_extra_files), +# ) + +# # check if result can be read and has expected seq. +# ent, ss = io.LoadMMCIF(mdl_path, seqres=True) +# ent_seqres = [ss.FindSequence(chn.name).string for chn in ent.chains] +# exp_seqres = [] +# for trg_ent in mdlcf_json["target_entities"]: +# exp_seqres += [trg_ent["seqres"]] * len(trg_ent["pdb_chain_ids"]) +# assert ent_seqres == exp_seqres, f"Bad seqres {mdl_id}" +# # check model itself +# ch_names, mdlsqe, resnum_range, ent_bfs = _process_ent(ent) +# assert ch_names == ["A", "B", "C"] +# target_entity = mdlcf_json["target_entities"][0] +# assert mdlsqe == target_entity["sqe_no_gaps"] +# assert max(resnum_range) <= len(target_entity["seqres"]) +# if mdl_type == "assembly": +# seq_range = ( +# json_dict["predicted_sequence_start"], +# json_dict["predicted_sequence_end"], +# ) +# assert resnum_range == seq_range +# else: +# assert resnum_range == (1, len(mdlsqe)) +# exp_bfs = mdlcf_json["scores"]["b_factors"] +# bfs_max_diff = np.max(abs(np.asarray(ent_bfs) - np.asarray(exp_bfs))) +# assert bfs_max_diff < 0.01 + +# print(f" ... done with {mdl_id} ({timer()-pdb_start:.2f}s).") + + +def _main(): + """Run as script.""" + # 1. Update ModelCIF files with UniProtKB AC incl. range, entity + # description text, entry title, entry description from CSV file + # 2. Validate sequence compared to UniProt (incl. checking/fetching seq. + # version as done in SCHWED-6162 for Ian's set) + opts = _parse_args() + + # handle single entry (print statements kept consistent with other sets) + print("Working on models...") + + print("... done with models.") + + # TEST: to judge res. needed on cluster + # import resource + # print("mem", resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000) + + +if __name__ == "__main__": + _main()