diff --git a/projects/2024-12-ma-denv/translate2modelcif.py b/projects/2024-12-ma-denv/translate2modelcif.py index daf668bf1087841f9974d9bb53d19c6f13df6761..aa3a49baf05b6b4f5bccdde84ea9563b86eebad6 100644 --- a/projects/2024-12-ma-denv/translate2modelcif.py +++ b/projects/2024-12-ma-denv/translate2modelcif.py @@ -13,37 +13,21 @@ Expected output in ./modelcif for example above: - ma-taas-0272-issues.json listing issues for that conversion (if any) """ +from datetime import date import argparse +import csv +import gzip import os import sys -# ToDo: NEEDED? import datetime -# ToDo: NEEDED? import gzip -# ToDo: NEEDED? import shutil -# ToDo: NEEDED? import zipfile -# ToDo: NEEDED? import pickle -# ToDo: NEEDED? import re -# ToDo: NEEDED? import traceback +# For whatever reason, 'no-name-in-module' can not be silenced by config atm +# pylint: disable=no-name-in-module +from gemmi import cif -# ToDo: NEEDED? from io import StringIO -# ToDo: NEEDED? from timeit import default_timer as timer -# ToDo: NEEDED? import numpy as np -# ToDo: NEEDED? import requests -# ToDo: NEEDED? import ujson as json +# pylint: enable=no-name-in-module +import gemmi -# ToDo: NEEDED? import ihm -# ToDo: NEEDED? import ihm.citations - -# ToDo: NEEDED? import modelcif -# ToDo: NEEDED? import modelcif.associated -# ToDo: NEEDED? import modelcif.dumper -# ToDo: NEEDED? import modelcif.model -# ToDo: NEEDED? import modelcif.protocol -# ToDo: NEEDED? import modelcif.reference -# ToDo: NEEDED? import gemmi - -# ToDo: NEEDED? import pandas as pd -# ToDo: NEEDED? from ost import io, seq +from uniprotkb import UniProtKBEntryCache ################################################################################ @@ -55,11 +39,6 @@ def _abort_msg(msg, exit_code=1): sys.exit(exit_code) -# def _warn_msg(msg): -# """Write a warning message to stdout.""" -# print(f"WARNING: {msg}") - - def _check_file(file_path): """Make sure a file exists and is actually a file.""" if not os.path.exists(file_path): @@ -68,247 +47,22 @@ def _check_file(file_path): _abort_msg(f"File path does not point to file: '{file_path}'.") -# def _check_folder(dir_path): -# """Make sure a file exists and is actually a file.""" -# if not os.path.exists(dir_path): -# _abort_msg(f"Path not found: '{dir_path}'.") -# if not os.path.isdir(dir_path): -# _abort_msg(f"Path does not point to a directory: '{dir_path}'.") - - -# def _check_opts_folder(dir_path): -# """Remove trailing '/' (return fixed one) and check if path valid.""" -# if dir_path.endswith("/"): -# dir_path = dir_path[:-1] -# _check_folder(dir_path) -# return dir_path - - -# def _get_res_num(r, use_auth=False): -# """Get res. num. from auth. IDs if reading from mmCIF files.""" -# if use_auth: -# return int(r.GetStringProp("pdb_auth_resnum")) -# return r.number.num - - -# def _get_ch_name(ch, use_auth=False): -# """Get chain name from auth. IDs if reading from mmCIF files.""" -# if use_auth: -# return ch.GetStringProp("pdb_auth_chain_name") -# return ch.name - - -# def _get_sequence(chn, use_auth=False): -# """Get the sequence out of an OST chain incl. '-' for gaps in resnums.""" -# # initialise (add gaps if first is not at num. 1) -# lst_rn = _get_res_num(chn.residues[0], use_auth) -# idx = 1 -# sqe = "-" * (lst_rn - 1) + chn.residues[0].one_letter_code -# -# for res in chn.residues[idx:]: -# lst_rn += 1 -# while lst_rn != _get_res_num(res, use_auth): -# sqe += "-" -# lst_rn += 1 -# sqe += res.one_letter_code -# return sqe - - -# class ZipFileHandler: -# """ -# A class to handle ZIP files, including nested ZIP files, allowing for -# listing files and extracting or reading files on demand. -# File names are represented as tuples where all besides the last item -# are expected to be nested paths to ZIP files. - -# Attributes: -# zip_path (str): The path to the main ZIP file. -# file_list (list): List of tuples representing the hierarchical ... -# """ - -# def __init__(self, zip_path): -# """ -# Initializes the ZipFileHandler with the path to the main ZIP file. - -# Args: -# zip_path (str): The path to the main ZIP file. -# """ -# self.zip_path = zip_path -# self.file_list = self._build_file_list() - -# def _build_file_list(self, zip_file=None, base_path=()): -# """ -# Recursively builds a list representing the hierarchical file structure -# within the ZIP file. - -# Args: -# zip_file (zipfile.ZipFile, optional): The current ZIP file ... -# If None, the main ZIP file is processed. Default is None. -# base_path (tuple): The base path within the hierarchical ... - -# Returns: -# list of tuples representing the hierarchical file structure. -# """ -# file_list = [] -# if zip_file is None: -# with zipfile.ZipFile(self.zip_path, "r") as zip_ref: -# file_list.extend(self._build_file_list(zip_ref)) -# else: -# for file_info in zip_file.infolist(): -# file_name = file_info.filename -# if file_name.endswith("/"): -# continue # Skip directories -# if file_name.endswith(".zip"): -# with zip_file.open(file_name) as nested_zip: -# with zipfile.ZipFile(nested_zip) as nested_zip_ref: -# nested_base_path = base_path + (file_name,) -# file_list.extend( -# self._build_file_list( -# nested_zip_ref, nested_base_path -# ) -# ) -# else: -# file_list.append(base_path + (file_name,)) -# return file_list - -# def _read_all_files(self, zip_file=None, base_path=()): -# """ -# Recursively builds a list representing the hierarchical file structure -# within the ZIP file and reads all files. - -# Args: -# zip_file (zipfile.ZipFile, optional): The current ZIP file ... -# If None, the main ZIP file is processed. Default is None. -# base_path (tuple): The base path within the hierarchical ... - -# Returns: -# dictionary with key matching tuples in self.file_list and -# value being raw read content of file as in self.read_file. -# """ -# file_list = {} -# if zip_file is None: -# with zipfile.ZipFile(self.zip_path, "r") as zip_ref: -# file_list.update(self._read_all_files(zip_ref)) -# else: -# for file_name in zip_file.namelist(): -# if file_name.endswith("/"): -# continue # Skip directories -# if file_name.endswith(".zip"): -# with zip_file.open(file_name) as nested_zip: -# with zipfile.ZipFile(nested_zip) as nested_zip_ref: -# nested_base_path = base_path + (file_name,) -# file_list.update( -# self._read_all_files( -# nested_zip_ref, nested_base_path -# ) -# ) -# else: -# file_list[base_path + (file_name,)] = zip_file.open( -# file_name -# ).read() -# return file_list - -# def _zip_parser(self, current_zip, filename_tuple, callback): -# """ -# Hierarchical parsing of ZIP file through filename_tuple. -# Calls callback(zip_ref, name) at final nesting level with -# zip_ref being the open ZipFile and name the filename within it. -# """ -# with zipfile.ZipFile(current_zip, "r") as zip_ref: -# name = filename_tuple[0] -# if len(filename_tuple) == 1: -# return callback(zip_ref, name) -# else: -# with zip_ref.open(name) as nested_zip: -# return self._zip_parser( -# nested_zip, filename_tuple[1:], callback -# ) - -# def extract_file(self, filename_tuple, extract_to="."): -# """ -# Extracts a specific file from the ZIP file based on the ... -# Note that the extracted file path will be according to ... -# within the extract_to folder. - -# Args: -# filename_tuple (tuple): A tuple representing the hierarchical ... -# extract_to (str): The directory to extract the file to. ... - -# Raises: -# FileNotFoundError: If the specified file is not found in the ... -# """ -# self._zip_parser( -# self.zip_path, -# filename_tuple, -# lambda zip_ref, name: zip_ref.extract(name, path=extract_to), -# ) - -# def extract_file_to_path(self, filename_tuple, output_path): -# """ -# ... -# """ - -# def _extract_to_path(zip_ref, name): -# with zip_ref.open(name) as source, open( -# output_path, "wb" -# ) as target: -# shutil.copyfileobj(source, target) - -# self._zip_parser(self.zip_path, filename_tuple, _extract_to_path) - -# def read_file(self, filename_tuple): -# """ -# Reads and returns the content of a specific file from the ZIP ... - -# Args: -# filename_tuple (tuple): A tuple representing the hierarchical ... - -# Returns: -# bytes: The content of the specified file. - -# Raises: -# FileNotFoundError: If the specified file is not found in the ... -# """ -# return self._zip_parser( -# self.zip_path, -# filename_tuple, -# lambda zip_ref, name: zip_ref.open(name).read(), -# ) - -# def read_all_files(self): -# """ -# Recursively builds a list representing the hierarchical file structure -# within the ZIP file and reads all files. - -# NOTE: for large zip files with data in nested zip file, this is much, -# much faster than calling read_file individually! - -# Returns: -# dictionary with key matching tuples in self.file_list and -# value being raw read content of file as in self.read_file. -# """ -# return self._read_all_files() - -# def process_file(self, filename_tuple, func): -# """ -# Opens a specific file from the ZIP file based on the hierarchical path -# and returns the result of func(fo) where fo is a file-like object. - -# Args: -# filename_tuple (tuple): A tuple representing the hierarchical ... -# func (function): Function to call on file-like object. +def _check_folder(dir_path, create): + """Make sure a file exists and is actually a file.""" + if not os.path.exists(dir_path): + if not create: + _abort_msg(f"Path not found: '{dir_path}'.") + os.makedirs(dir_path, exist_ok=True) + if not os.path.isdir(dir_path): + _abort_msg(f"Path does not point to a directory: '{dir_path}'.") -# Returns: -# Result of func. -# Raises: -# FileNotFoundError: If the specified file is not found in the ... -# """ -# return self._zip_parser( -# self.zip_path, -# filename_tuple, -# lambda zip_ref, name: func(zip_ref.open(name)), -# ) +def _check_opts_folder(dir_path, create=False): + """Remove trailing '/' (return fixed one) and check if path valid.""" + if dir_path.endswith("/"): + dir_path = dir_path[:-1] + _check_folder(dir_path, create=create) + return dir_path ################################################################################ @@ -336,7 +90,8 @@ def _parse_args(): type=str, metavar="<MODEL INFO CSV FILE>", help="Path to a CSV file with information about each model. Format: " - + "<FILENAME>,<UNP AC>,?,?,?,?,?", + + "<FILENAME>,<UNP AC>,<UNP ALIGNMENT START>,<UNP ALIGNMENT END>," + + "<ENTITY DESCRIPTION>,<MODEL TITLE>,<MODEL DESCRIPTION>", ) parser.add_argument( "out_dir", @@ -357,26 +112,23 @@ def _parse_args(): ) parser.add_argument( "--compress", + "-c", default=False, action="store_true", help="Compress ModelCIF file with gzip.", ) - # ToDo: do we want checks-only? - # parser.add_argument( - # "--checks-only", - # default=False, - # action="store_true", - # help="Only check for issues without producing ModelCIF files.", - # ) + parser.add_argument( + "--checks-only", + default=False, + action="store_true", + help="Only check for issues without producing ModelCIF files.", + ) opts = parser.parse_args() # check input _check_file(opts.modelcif) _check_file(opts.model_info_csv) - if opts.out_dir.endswith("/"): - opts.out_dir = opts.out_dir[:-1] - if not os.path.exists(opts.out_dir): - os.makedirs(opts.out_dir, exist_ok=True) + _check_opts_folder(opts.out_dir, create=True) # check if the ModelCIF file already exists in out_dir out_file = os.path.join( opts.out_dir, @@ -391,2129 +143,306 @@ def _parse_args(): return opts -# def _get_audit_authors(): -# """Return the list of authors that produced this model.""" -# return ( -# "Dobbelstein, Adrian", -# "Alva, Vikram", -# ) - - -# def _get_file_list_dict(file_list): -# """ -# Get dictionary for access to files for conversion. - -# Args: -# file_list (list): As returned by ZipFileHandler. - -# Returns: -# dict with following keys / values: -# - "mdl_type" / "assembly" or "domain" -# - "json_file" / filename_tuple for access in ZipFileHandler -# - "cif_file" / filename_tuple for access in ZipFileHandler -# - "image_file" / filename_tuple for access in ZipFileHandler -# - "accompanying_data" / dict with -# filename without path as key and -# filename_tuple for access in ZipFileHandler as value -# - "unparsed_files" / list of filename_tuple -# """ -# result = { -# "mdl_type": None, -# "json_file": None, -# "cif_file": None, -# "image_file": None, -# "accompanying_data": {}, -# "unparsed_files": [], -# } -# for filename_tuple in file_list: -# basename = os.path.basename(filename_tuple[-1]) -# fs = filename_tuple[0].split(os.path.sep) -# if fs[-2] == "accompanying_data" or fs[-1] == "accompanying_data.zip": -# assert basename not in result["accompanying_data"] -# result["accompanying_data"][basename] = filename_tuple -# elif basename == "assembly_info.json": -# assert result["mdl_type"] in [None, "assembly"] -# assert result["json_file"] is None -# result["mdl_type"] = "assembly" -# result["json_file"] = filename_tuple -# elif basename == "assembly.cif": -# assert result["mdl_type"] in [None, "assembly"] -# assert result["cif_file"] is None -# result["mdl_type"] = "assembly" -# result["cif_file"] = filename_tuple -# elif basename == "domain_info.json": -# assert result["mdl_type"] in [None, "domain"] -# assert result["json_file"] is None -# result["mdl_type"] = "domain" -# result["json_file"] = filename_tuple -# elif basename == "domain.cif": -# assert result["mdl_type"] in [None, "domain"] -# assert result["cif_file"] is None -# result["mdl_type"] = "domain" -# result["cif_file"] = filename_tuple -# elif basename == "image.png": -# assert result["image_file"] is None -# result["image_file"] = filename_tuple -# else: -# result["unparsed_files"].append(filename_tuple) -# for k, v in result.items(): -# has_content = bool(v) -# if k != "unparsed_files" and not has_content: -# raise RuntimeError(f"{k} not set in zip file.") -# return result - - -# def _get_zip_content(zip_file_path): -# """Same as _get_file_list_dict but reads zip file content.""" -# zip_handler = ZipFileHandler(zip_file_path) -# zip_dict = _get_file_list_dict(zip_handler.file_list) -# zip_content = zip_handler.read_all_files() -# for k, v in zip_dict.items(): -# if k == "accompanying_data": -# v = {ak: zip_content[av] for ak, av in v.items()} -# elif k in ["json_file", "cif_file", "image_file"]: -# v = zip_content[v] -# zip_dict[k] = v -# return zip_dict - - -# def _get_modeller_software(version=None): -# """Get MODELLER as a dictionary, suitable to create a modelcif software -# object.""" -# # Note: depositor had suggested use of Webb and Sali 2016 but decided -# # against in given input from Ben Webb (1993 citation is still the ... -# # one: https://salilab.org/modeller/manual/node8.html). -# return { -# "name": "MODELLER", -# "classification": "model building", -# "description": "Comparative modeling by satisfaction of spatial " -# "restraints", -# "citation": ihm.citations.modeller, -# "location": "https://salilab.org/modeller/", -# "type": "program", -# "version": version, -# } - - -# # global definition to avoid duplicated entries in ModelCIF -# ihm.citations.af2_multimer = ihm.Citation( -# pmid=None, -# title="Protein complex prediction with AlphaFold-Multimer.", -# journal="bioRxiv", -# volume=None, -# page_range=None, -# year=2021, -# authors=[ -# "Evans, R.", -# "O'Neill, M.", -# "Pritzel, A.", -# "Antropova, N.", -# "Senior, A.", -# "Green, T.", -# "Zidek, A.", -# "Bates, R.", -# "Blackwell, S.", -# "Yim, J.", -# "Ronneberger, O.", -# "Bodenstein, S.", -# "Zielinski, M.", -# "Bridgland, A.", -# "Potapenko, A.", -# "Cowie, A.", -# "Tunyasuvunakool, K.", -# "Jain, R.", -# "Clancy, E.", -# "Kohli, P.", -# "Jumper, J.", -# "Hassabis, D.", -# ], -# doi="10.1101/2021.10.04.463034", -# ) - - -# def _get_af2_software(version=None, is_multimer=False): -# """Get AF2 as dictionary, suitable to create a modelcif software ...""" -# if is_multimer: -# return { -# "name": "AlphaFold-Multimer", -# "classification": "model building", -# "description": "Structure prediction", -# "citation": ihm.citations.af2_multimer, -# "location": "https://github.com/deepmind/alphafold", -# "type": "package", -# "version": version, -# } -# else: -# return { -# "name": "AlphaFold", -# "classification": "model building", -# "description": "Structure prediction", -# "citation": ihm.citations.alphafold2, -# "location": "https://github.com/deepmind/alphafold", -# "type": "package", -# "version": version, -# } - - -# def _get_dssp_software(version=None): -# """Get DSSP SW as dictionary, suitable to create a modelcif software -# object.""" -# # NOTE: this is using the old repo and old citation (ok for v2.3) -# return { -# "name": "DSSP", -# "classification": "data collection", -# "description": "Secondary structure assignment", -# "citation": ihm.Citation( -# pmid="6667333", -# title="Dictionary of protein secondary structure: pattern " -# + "recognition of hydrogen-bonded and geometrical features.", -# journal="Biopolymers", -# volume=22, -# page_range=(2577, 2637), -# year=1983, -# authors=["Kabsch, W.", "Sander, C."], -# doi="10.1002/bip.360221211", -# ), -# "location": "https://github.com/cmbi/dssp", -# "type": "program", -# "version": version, -# } - - -# def _get_foldseek_software(version=None): -# """Get Foldseek SW as dictionary, suitable to create a modelcif software -# object.""" -# return { -# "name": "Foldseek", -# "classification": "data collection", -# "description": "Protein structure search", -# "citation": ihm.Citation( -# pmid="37156916", -# title="Fast and accurate protein structure search with Foldseek.", -# journal="Nat Biotechnol", -# volume=42, -# page_range=(243, 246), -# year=2024, -# authors=[ -# "van Kempen, M.", -# "Kim, S.S.", -# "Tumescheit, C.", -# "Mirdita, M.", -# "Lee, J.", -# "Gilchrist, C.L.M.", -# "Soeding, J.", -# "Steinegger, M.", -# ], -# doi="10.1038/s41587-023-01773-0", -# ), -# "location": "https://search.foldseek.com/search", -# "type": "package", -# "version": version, -# } - - -# def _get_sam_cc_software(version=None): -# """Get SamCC-Turbo SW as dictionary, suitable to create a modelcif ... -# object.""" -# return { -# "name": "SamCC-Turbo", -# "classification": "data collection", -# "description": "Detection and measurement of coiled coils", -# "citation": ihm.Citation( -# pmid="33325494", -# title="A library of coiled-coil domains: from regular bundles to " -# + "peculiar twists.", -# journal="Bioinformatics", -# volume=36, -# page_range=(5368, 5376), -# year=2020, -# authors=[ -# "Szczepaniak, K.", -# "Bukala, A.", -# "da Silva Neto, A.M.", -# "Ludwiczak, J.", -# "Dunin-Horkawicz, S.", -# ], -# doi="10.1093/bioinformatics/btaa1041", -# ), -# "location": "https://github.com/labstructbioinf/samcc_turbo", -# "type": "package", -# "version": version, -# } - - -# def _get_us_align_software(version=None): -# """Get US-align SW as dictionary, suitable to create a modelcif software -# object.""" -# return { -# "name": "US-align", -# "classification": "data collection", -# "description": "Universal Structural alignment of macromolecules", -# "citation": ihm.Citation( -# pmid="36038728", -# title="US-align: universal structure alignments of proteins, " -# + "nucleic acids, and macromolecular complexes.", -# journal="Nat Methods", -# volume=19, -# page_range=(1109, 1115), -# year=2022, -# authors=["Zhang, C.", "Shine, M.", "Pyle, A.M.", "Zhang, Y."], -# doi="10.1038/s41592-022-01585-1", -# ), -# "location": "https://zhanggroup.org/US-align/", -# "type": "package", -# "version": version, -# } - - -# def _get_af2_description(pred): -# """Get description text based on parameters used.""" -# if pred["version"] == "2.1.2": -# num_models_str = "producing 5 models with 3 recycles" -# elif pred["version"] == "2.3.1": -# num_models_str = ( -# "producing 10 models (2 random seeds per parameter " -# "set) with up to 20 recycles" -# ) -# else: -# raise RuntimeError(f"Unexpected AF2 version: '{pred['version']}'") -# return ( -# f"using AlphaFold v{pred['version']} {num_models_str} and " -# f"{pred['num_ensemble']} ensemble each, with AMBER relaxation, " -# f"using templates up to date {pred['tmplate_cutoff_date']}, " -# f"ranked by ipTM*0.8+pTM*0.2, starting from MSAs with reduced_dbs " -# f"setting" -# ) - - -# def _get_af2_merge_dict(pred): -# """Get N/C-terminal padding lengths for artificial coiled-coils.""" -# pred_name = os.path.splitext(pred["prediction_p"])[0] -# ps = pred_name.split("_") -# pred_merge = {"N": 0, "C": 0} -# if "mergeN" in ps: -# pred_merge["N"] = int(ps[ps.index("mergeN") + 1]) -# if "mergeC" in ps: -# pred_merge["C"] = int(ps[ps.index("mergeC") + 1]) -# return pred_merge - - -# def _get_af2_range_string(pred): -# """Get string repr. of modelled range incl. coiled-coil padding.""" -# range_str = f"{pred['prediction_start']}-{pred['prediction_end']}" -# pred_merge = _get_af2_merge_dict(pred) -# if pred_merge["N"] or pred_merge["C"]: -# if pred_merge["N"] and pred_merge["C"]: -# seq_str = "sequences" -# else: -# seq_str = "sequence" -# pad_strs = [] -# if pred_merge["N"]: -# pad_strs.append(f"{pred_merge['N']} N-terminal residues") -# if pred_merge["C"]: -# pad_strs.append(f"{pred_merge['C']} C-terminal residues") -# pad_str = ", ".join(pad_strs) -# range_str += ( -# f" padded with artificial coiled-coil {seq_str} " -# f"({pad_str}) to improve oligomerization" -# ) -# return range_str - - -# def _get_preds_and_parts(json_dict, mdl_type): -# """Get list of AF2 pred. and trucated parts in a consistent way. -# For domains, the final model is the truncated part and gets "model" -# as truncated_prediction_path. -# """ -# if mdl_type == "assembly": -# af2_preds = json_dict["AF_predictions"] -# trunc_parts = json_dict["assembly_parts"] -# elif mdl_type == "domain": -# af2_pred = json_dict["source_AF_prediction"] -# af2_preds = [af2_pred] -# trunc_parts = [ -# { -# "part_start": json_dict["domain_sequence_start"], -# "part_end": json_dict["domain_sequence_end"], -# "truncated_prediction_path": "model", -# "source_prediction": os.path.splitext( -# af2_pred["prediction_p"] -# )[0], -# } -# ] -# else: -# raise RuntimeError(f"Unexpected mdl_type: '{mdl_type}'") -# # sort them by range -# af2_preds.sort( -# key=lambda pred: ( -# int(pred["prediction_start"]), -# int(pred["prediction_end"]), -# ) -# ) -# trunc_parts.sort(key=lambda part: (part["part_start"], part["part_end"])) -# return af2_preds, trunc_parts - - -# def _get_af2_files(af2_path): -# """Return names for PDB file, PKL file, and PAE file.""" -# af_base = os.path.splitext(af2_path)[0] -# return af2_path, f"{af_base}_info.pkl", f"PAE_{af_base}.png" - - -# def _get_protocol_steps_and_software(json_dict, mdl_type, mdl_id): -# """Create the list of protocol steps with software and parameters used.""" -# protocol = [] - -# # LOGIC: -# # - First steps are AF2 predictions with different settings -# # -> each setting has a number of ranges -# # -> if only one AF pred., the text gets simplified -# # - Next step: models truncated (and superposed) into parts -# # -> one step for all ranges -# # -> no superpos. if only one part -# # - For assemblies with more than one part: -# # -> MODELLER step -# # -> QE step combining pLDDT and MODELLER energy score -# # Note: sanity checks all done in _do_sanity_checks and not here! - -# # collect info on predictions and parts -# af2_preds, trunc_parts = _get_preds_and_parts(json_dict, mdl_type) - -# # group AF2 predictions by SW config -# af2_config_preds = {} # dict = version, tmplate_cutoff_date, num_ensemble -# for pred in af2_preds: -# config_tuple = ( -# pred["version"], -# pred["tmplate_cutoff_date"], -# pred["num_ensemble"], -# ) -# if config_tuple not in af2_config_preds: -# af2_config_preds[config_tuple] = [pred] -# else: -# af2_config_preds[config_tuple].append(pred) - -# # create AF2 steps -# for config_tuple in sorted(af2_config_preds): -# version, max_template_date, num_ensemble = config_tuple -# sw_params = { -# "model_preset": "multimer", -# "db_preset": "reduced_dbs", -# "max_template_date": max_template_date, -# "num_ensemble": num_ensemble, -# } -# if version == "2.3.1": -# sw_params["num_multimer_predictions_per_model"] = 2 -# sw_plus_params = [ -# (_get_af2_software(version=version, is_multimer=True), sw_params) -# ] -# # build up text -# cur_preds = af2_config_preds[config_tuple] -# ranges_txt = ", ".join( -# [_get_af2_range_string(pred) for pred in cur_preds] -# ) -# af_txt = _get_af2_description(cur_preds[0]) # all cur_preds the same -# # shortened text for single pred. -# if len(af2_preds) == 1: -# details = ( -# f"AlphaFold-Multimer prediction {af_txt} for range " -# f"{ranges_txt}." -# ) -# else: -# details = ( -# f"AlphaFold-Multimer prediction of overlapping, " -# f"partial sequences {af_txt}. The following residue " -# f"ranges were predicted: {ranges_txt}." -# ) -# # collect output files -# output = [] -# for pred in cur_preds: -# output.extend(_get_af2_files(pred["prediction_p"])) -# protocol.append( -# { -# "method_type": "modeling", -# "name": "AF2", -# "details": details, -# "input": [ -# f"target_sequences_{mdl_id}", -# f"ref_dbs_AF2_{version}", -# ], -# "output": output, -# "software_plus_params": sw_plus_params, -# } -# ) - -# # create truncation step -# ranges_txt = ", ".join( -# [f"{part['part_start']}-{part['part_end']}" for part in trunc_parts] -# ) -# if len(trunc_parts) == 1: -# details = f"Model truncated to predicted domain range {ranges_txt}." -# else: -# details = ( -# f"Models truncated and superposed based on the following " -# f"overlapping residue ranges: {ranges_txt}." -# ) -# trunc_data = [part["truncated_prediction_path"] for part in trunc_parts] -# # add MDL ID suffix if needed -# trunc_data = [ -# (f"model_{mdl_id}" if td == "model" else td) for td in trunc_data -# ] -# protocol.append( -# { -# "method_type": "other", -# "name": "Truncation", -# "details": details, -# "input": sorted( -# set( -# [ -# f"{part['source_prediction']}.pdb" -# for part in trunc_parts -# ] -# ) -# ), -# "output": trunc_data, -# "software_plus_params": [], -# } -# ) - -# # create other steps to combine truncated parts -# if len(trunc_parts) > 1: -# # MODELLER step -# protocol.append( -# { -# "method_type": "modeling", -# "name": "MODELLER", -# "details": "Assembly of the truncated models using MODELLER " -# 'executed using the "AutoModel" configuration.', -# "input": trunc_data, -# "output": [f"model_{mdl_id}"], -# "software_plus_params": [(_get_modeller_software("10.4"), -# {})], -# } -# ) -# # QE step -# protocol.append( -# { -# "method_type": "other", -# "name": "QE", -# "details": "Calculation of combined per-residue confidence " -# "metric (range 0-100, higher is better) based on " -# "per-residue AlphaFold confidence score (pLDDT) " -# "(range 0-100, higher is better, for residues with " -# "overlapping predictions, the higher pLDDT score is " -# "used) and per-residue MODELLER energy score (>0, " -# "lower is better) using the following formula: " -# "confidence = clip(0,100, pLDDT - 60 / (1 + exp((150 " -# "- energy)/20))). This score reduces the pLDDT score " -# "by max. 60 (for energy values >> 150). For energy " -# "values < 100, the score approximately follows the " -# "pLDDT score.", -# "input": [f"model_{mdl_id}"], -# "output": [f"model_{mdl_id}"], -# "software_plus_params": [], -# } -# ) -# if "software_used_for_domain_annotation" in json_dict: -# # domain annotation step -# protocol.append( -# { -# "method_type": "other", -# "name": "domain annotation", -# "details": "..." -# "the following tools: DSSP, SamCC-Turbo, Foldseek, " -# "US-align.", -# "input": [f"model_{mdl_id}"], -# "output": [f"model_{mdl_id}"], -# "software_plus_params": [ -# (_get_dssp_software("2.3.0"), {}), -# (_get_sam_cc_software(), {}), -# (_get_foldseek_software(), {}), -# (_get_us_align_software(), {}), -# ], -# } -# ) - -# return protocol - - -# def _process_ent(ent): -# """Helper to process OST entities for sanity checks. -# Returns: -# - ch_names: list of chain names in order as appearing in file -# - mdlsqe: atomseq (no gaps) for model -# -> incl. assertion that all chains have same seq. -# - resnum_range: (min. res. num., max. res. num.) -# -> incl. assertion that res. num. are continuous and w/o gaps -# - ent_bfs: b-factor for each residue -# -> incl. assertion that all atoms of res. have same -# -> list of length len(ch_names) * len(mdlsqe) -# """ -# # chain names in order -# ch_names = [ch.name for ch in ent.chains] -# # sequence -# unique_atomseqs = sorted( -# set( -# [ -# "".join(res.one_letter_code for res in ch.residues) -# for ch in ent.chains -# ] -# ) -# ) -# assert len(unique_atomseqs) == 1 -# mdlsqe = unique_atomseqs[0] -# # res. nums (required to be continuous) -# resnum_ranges = [] -# for ch in ent.chains: -# res_nums = [res.number.num for res in ch.residues] -# assert res_nums == list(range(min(res_nums), max(res_nums) + 1)) -# resnum_ranges.append((min(res_nums), max(res_nums))) -# unique_resnum_ranges = sorted(set(resnum_ranges)) -# assert len(unique_resnum_ranges) == 1 -# resnum_range = unique_resnum_ranges[0] -# # b-factors -# ent_bfs = [] -# for res in ent.residues: -# b_factors = [a.b_factor for a in res.atoms] -# assert len(set(b_factors)) == 1 # must all be equal! -# ent_bfs.append(b_factors[0]) -# assert len(ent_bfs) == len(ch_names) * len(mdlsqe) -# return ch_names, mdlsqe, resnum_range, ent_bfs - - -# def _load_cif_file(zip_dict): -# """Read CIF file for given entry. -# Returns OST entity and dictionary of categories to put back into resulting -# ModelCIF file with gemmi (categories not supported by python-modelcif). -# """ -# # fetch file and fix for OST -# cif_str = zip_dict["cif_file"] -# doc = gemmi.cif.read_string(cif_str) -# block = doc.sole_block() -# cat_dict = block.get_mmcif_category("_struct_ref.") -# if cat_dict and "db_code" not in cat_dict: -# if "pdbx_db_accession" in cat_dict: -# cat_dict["db_code"] = cat_dict["pdbx_db_accession"] -# else: -# lengths = [len(v) for v in cat_dict.values()] -# assert len(set(lengths)) == 1 -# cat_dict["db_code"] = [""] * lengths[0] -# block.set_mmcif_category("_struct_ref.", cat_dict) -# # kept mmCIF data: to check consistency later -# kept_cif_cats = {} -# for cat in [ -# "_pdbx_domain.", -# "_pdbx_feature_domain.", -# "_pdbx_domain_range.", -# ]: -# cat_dict = block.get_mmcif_category(cat) -# if cat_dict: -# kept_cif_cats[cat] = cat_dict -# # fix data -# if len(kept_cif_cats) == 3: -# # fix IDs if possible -# pd_ids = kept_cif_cats["_pdbx_domain."]["id"] -# pfd_ids = kept_cif_cats["_pdbx_feature_domain."]["id"] -# pfd_dom_ids = kept_cif_cats["_pdbx_feature_domain."]["domain_id"] -# pdr_dom_ids = kept_cif_cats["_pdbx_domain_range."]["domain_id"] -# if ( -# pdr_dom_ids != pd_ids -# and pfd_ids == pd_ids -# and pfd_dom_ids == pdr_dom_ids -# ): -# kept_cif_cats["_pdbx_feature_domain."]["domain_id"] = pd_ids -# kept_cif_cats["_pdbx_domain_range."]["domain_id"] = pd_ids -# # fix characters in expected places -# exp_cat_items = [ -# "_pdbx_domain.details", -# "_pdbx_feature_domain.feature", -# ] -# for cat, cat_dict in kept_cif_cats.items(): -# for item in cat_dict.keys(): -# if f"{cat}{item}" in exp_cat_items: -# cat_dict[item] = [ -# value.replace("°-", " degree ") -# .replace("°", " degree") -# .replace("αβ", "alpha/beta") -# .replace("α", "alpha") -# .replace("β", "beta") -# .replace("“", '"') -# .replace("→", " to ") -# for value in cat_dict[item] -# ] -# # get OST ent. -# cif_ent, info, ss = io.MMCifStrToEntity( -# doc.as_string(), profile=io.profiles["DEFAULT"], process=True -# ) -# return cif_ent, kept_cif_cats - - -# def _do_sanity_checks(mdl_id, json_dict, zip_dict, cif_ent, kept_cif_cats): -# """Check if everything in order and return issues for weird cases.""" -# issues = [] - -# # get some general info depending on model type -# mdl_type = zip_dict["mdl_type"] -# plddt_len = len(json_dict["pLDDT"]) -# if mdl_type == "assembly": -# mdl_seq = json_dict["predicted_sequence"] -# seq_range = ( -# json_dict["predicted_sequence_start"], -# json_dict["predicted_sequence_end"], -# ) -# ent_desc = json_dict["source_sequence_description"] -# assert "software_used_for_domain_annotation" in json_dict -# assert len(json_dict["per_res_conf"]) == plddt_len -# else: -# assert "predicted_sequence" not in json_dict -# mdl_seq = json_dict["domain_sequence"] -# seq_range = ( -# json_dict["domain_sequence_start"], -# json_dict["domain_sequence_end"], -# ) -# label = f'domain/motif "{json_dict["domain_name"]}"' -# assert label in json_dict["abstract"] -# ent_desc = f'{json_dict["source_sequence_description"]} ({label})' -# assert "merge" not in \ -# json_dict["source_AF_prediction"]["prediction_p"] -# assert "software_used_for_domain_annotation" not in json_dict -# assert "per_res_conf" not in json_dict - -# # global checks -# assert sum((c in "XOUBJZ") for c in mdl_seq) == 0 -# src_seq = json_dict["source_sequence"] -# assert mdl_seq == src_seq[seq_range[0] - 1 : seq_range[1]] -# if len(mdl_seq) != plddt_len: -# short_data = (len(mdl_seq), plddt_len) -# long_data = (mdl_seq, json_dict["pLDDT"]) -# issues.append(("plddt_size_mismatch", short_data, long_data)) -# assert seq_range[1] > seq_range[0] -# if seq_range[1] > len(src_seq): -# short_data = (seq_range, len(src_seq)) -# issues.append(("range_source_mismatch", short_data, [])) -# for key in ["percent_confident_residues", "mean_per_res_conf"]: -# assert key in json_dict -# extra_keys = set(json_dict.keys()) - set( -# [ -# "AF_predictions", -# "MODELLER_energy_score", -# "abstract", -# "assembly_parts", -# "domain_name", -# "domain_sequence", -# "domain_sequence_end", -# "domain_sequence_start", -# "mean_per_res_conf", -# "pLDDT", -# "per_res_conf", -# "percent_confident_residues", -# "predicted_sequence", -# "predicted_sequence_end", -# "predicted_sequence_start", -# "software_used_for_domain_annotation", -# "software_used_for_prediction", -# "source_AF_prediction", -# "source_sequence", -# "source_sequence_RefSeq_ID", -# "source_sequence_description", -# "title", -# "source_sequence_download_date", -# "domain_id", -# "coiled_coil_annot", -# ] -# ) -# if len(extra_keys) > 0: -# issues.append(("extra_keys", sorted(extra_keys), [])) -# # unused/unknown coiled_coil_annot (expect to be None if there) -# assert json_dict.get("coiled_coil_annot") is None - -# # for valid mmCIF... -# mmcif_regex_ent = "[][ \t_(),.;:\"&<>/\\\\{}'`~!@#$%?+=*A-Za-z0-9|^-]*" -# mmcif_regex_desc = "[][ \n\t()_,.;:\"&<>/\\\\{}'`~!@#$%?+=*A-Za-z0-9|^-]*" -# assert bool(re.fullmatch(mmcif_regex_ent, ent_desc)) -# assert bool(re.fullmatch(mmcif_regex_desc, json_dict["title"])) -# assert bool(re.fullmatch(mmcif_regex_desc, json_dict["abstract"])) - -# # collect info on predictions and parts -# af2_preds, trunc_parts = _get_preds_and_parts(json_dict, mdl_type) - -# # check AF pred. -# exp_pred_keys = set( -# [ -# "prediction_start", -# "prediction_end", -# "prediction_p", -# "version", -# "tmplate_cutoff_date", -# "num_ensemble", -# "pTM", -# "ipTM", -# ] -# ) -# opt_pred_keys = set(["merged_GCN4linker_len_N", -# "merged_GCN4linker_len_C"]) -# pred_ranges = {} -# sw_kws_to_check = set() -# for pred in af2_preds: -# # check keys -# extra_keys = set(pred.keys()) - exp_pred_keys - opt_pred_keys -# assert len(extra_keys) == 0 -# noncovered_keys = exp_pred_keys - pred.keys() -# assert len(noncovered_keys) == 0 -# assert pred["version"] in pred["prediction_p"] -# # keep track of files and ranges for checks of parts -# pred_name = os.path.splitext(pred["prediction_p"])[0] -# assert pred_name not in pred_ranges -# pred_range = ( -# int(pred["prediction_start"]), -# int(pred["prediction_end"]), -# ) -# pred_ranges[pred_name] = pred_range -# # keep track of AF2 versions -# sw_kws_to_check.add(("AlphaFold", pred["version"])) -# # check merge stuff -# pred_merge = _get_af2_merge_dict(pred) -# dict_pred_merge = {"N": 0, "C": 0} -# if "merged_GCN4linker_len_N" in pred.keys(): -# dict_pred_merge["N"] = pred["merged_GCN4linker_len_N"] -# if "merged_GCN4linker_len_C" in pred.keys(): -# dict_pred_merge["C"] = pred["merged_GCN4linker_len_C"] -# assert pred_merge == dict_pred_merge -# # check acc. file content -# pdb_file, pkl_file, pae_file = _get_af2_files(pred["prediction_p"]) -# pdb_ent = io.PDBStrToEntity( -# zip_dict["accompanying_data"][pdb_file], -# profile=io.profiles["DEFAULT"], -# process=True, -# ) -# pkl_data = pickle.loads(zip_dict["accompanying_data"][pkl_file]) -# (pdb_ch_names, pdb_mdlsqe, pdb_resnum_range, pdb_ent_bfs) = ( -# _process_ent(pdb_ent) -# ) -# # pdb_ch_names can have random order and we don't care... -# # we ignore pdb_resnum_range and just check seq. -# exp_mdlsqe = src_seq[pred_range[0] - 1 : pred_range[1]] -# cut_end = len(pdb_mdlsqe) - pred_merge["C"] -# pdb_mdlsqe_cut = pdb_mdlsqe[pred_merge["N"] : cut_end] -# assert pdb_mdlsqe_cut == exp_mdlsqe -# # check QE -# exp_qe_len = len(pdb_ch_names) * len(pdb_mdlsqe) -# assert len(pkl_data["pLDDT"]) == exp_qe_len -# assert len(pkl_data["PAE"]) == exp_qe_len -# assert len(pkl_data["PAE"][0]) == exp_qe_len -# assert "pTM" in pkl_data -# assert "ipTM" in pkl_data -# qe_max_diff = np.max(abs(np.asarray(pkl_data["pLDDT"]) - pdb_ent_bfs)) -# if qe_max_diff > 0.01: -# # 2nd option: chain names in PDB were reordered compared to pkl -# bfs_arr = np.asarray(pdb_ent_bfs).reshape(len(pdb_ch_names), -1) -# ch_indices = [pdb_ch_names.index(ch) for ch in ["A", "C", "B"]] -# bfs_new = bfs_arr[ch_indices, :].flatten() -# qe_max_diff = np.max(abs(np.asarray(pkl_data["pLDDT"]) - bfs_new)) -# if qe_max_diff > 0.01: -# short_data = [qe_max_diff, pkl_file] -# long_data = [list(pkl_data["pLDDT"]), pdb_ent_bfs] -# issues.append(("pkl_plddt_diff", short_data, long_data)) -# # check redundant data -# assert str(pred["prediction_start"]) == pkl_data["pred_start"] -# assert str(pred["prediction_end"]) == pkl_data["pred_end"] -# assert pred["tmplate_cutoff_date"] == pkl_data["tmpl_max_date"] -# assert pred["version"] == pkl_data["version"] -# assert pred["num_ensemble"] == pkl_data["num_ensemble"] -# if pred["pTM"] != pkl_data["pTM"]: -# long_data = [pred["pTM"], float(pkl_data["pTM"])] -# short_data = [abs(long_data[0] - long_data[1]), pkl_file] -# issues.append(("pkl_ptm_diff", short_data, long_data)) -# if pred["ipTM"] != pkl_data["ipTM"]: -# long_data = [pred["ipTM"], float(pkl_data["ipTM"])] -# short_data = [abs(long_data[0] - long_data[1]), pkl_file] -# issues.append(("pkl_iptm_diff", short_data, long_data)) -# if "merged_GCN4linker_len_N" in pred: -# if "merge_len_N" in pkl_data.keys(): -# other_len = pkl_data["merge_len_N"] -# else: -# # HACK for 0898 -# assert "merge_len" in pkl_data.keys() -# other_len = pkl_data["merge_len"] -# assert pred["merged_GCN4linker_len_N"] == other_len -# if "merged_GCN4linker_len_C" in pred: -# assert pred["merged_GCN4linker_len_C"] == pkl_data["merge_len_C"] -# # check expected fixed data -# assert pkl_data["max_msa_size"] == 10000 -# assert pkl_data["db_preset"] == "reduced_dbs" -# assert pkl_data["amber_relax"] == True - -# # check truncated parts -# if len(trunc_parts) > 1: -# assert len(json_dict["MODELLER_energy_score"]) == plddt_len -# sw_kws_to_check.add(("MODELLER", "10.4")) -# else: -# assert "MODELLER_energy_score" not in json_dict -# exp_part_keys = sorted( -# [ -# "part_start", -# "part_end", -# "truncated_prediction_path", -# "source_prediction", -# ] -# ) -# part_names = [] -# source_predictions = [] -# for part in trunc_parts: -# # check metadata -# assert sorted(part.keys()) == exp_part_keys -# source_predictions.append(part["source_prediction"]) -# part_names.append(part["truncated_prediction_path"]) -# pred_range = pred_ranges[part["source_prediction"]] -# assert part["part_start"] >= pred_range[0] -# assert part["part_end"] <= pred_range[1] -# # check acc. file content -# part_path = part["truncated_prediction_path"] -# if part_path != "model": -# pdb_ent = io.PDBStrToEntity( -# zip_dict["accompanying_data"][part_path], -# profile=io.profiles["DEFAULT"], -# process=True, -# ) -# (pdb_ch_names, pdb_mdlsqe, pdb_resnum_range, pdb_ent_bfs) = ( -# _process_ent(pdb_ent) -# ) -# part_range = (part["part_start"], part["part_end"]) -# assert pdb_resnum_range == part_range # not really important -# exp_mdlsqe = src_seq[part_range[0] - 1 : part_range[1]] -# assert exp_mdlsqe == pdb_mdlsqe - -# # check CIF file -# (cif_ch_names, cif_mdlsqe, cif_resnum_range, cif_ent_bfs) = _process_ent( -# cif_ent -# ) -# assert seq_range == cif_resnum_range # NOTE: critical for kept_cif_cats -# assert cif_mdlsqe == mdl_seq -# if cif_ch_names != ["A", "B", "C"]: -# issues.append(("cif_ch_names", cif_ch_names, [])) -# # check b-factors (assume to match average or first chain) -# if mdl_id == "ma-taas-2867": -# # known to have faulty b-factors in CIF file; should use ones from PDB -# bfs_arr = np.asarray(pdb_ent_bfs).reshape(len(cif_ch_names), -1) -# bfs_avg = bfs_arr[0, :] -# else: -# bfs_arr = np.asarray(cif_ent_bfs).reshape(len(cif_ch_names), -1) -# bfs_avg = bfs_arr.mean(axis=0) -# assert len(bfs_avg) == len(cif_mdlsqe) -# if "per_res_conf" in json_dict: -# # for assemblies -# jd_sc = json_dict["per_res_conf"] -# max_diff = np.max(abs(bfs_avg - jd_sc)) -# if max_diff >= 0.01: -# long_data = [bfs_avg.tolist(), jd_sc] -# issues.append(("per_res_conf", max_diff, long_data)) -# else: -# # for domains -# jd_sc = json_dict["pLDDT"] -# max_diff = np.max(abs(bfs_arr[0, :] - jd_sc)) -# # b-factors known to be faulty for some models... -# if max_diff >= 0.01 and mdl_id[-4:] not in ["9293", "9344"]: -# long_data = [bfs_arr[0, :].tolist(), jd_sc] -# issues.append(("cif_b_factors", max_diff, long_data)) - -# # make sure prediction covers everything -# min_start = min(part["part_start"] for part in trunc_parts) -# max_end = max(part["part_end"] for part in trunc_parts) -# if min_start != seq_range[0] or max_end != seq_range[1]: -# short_data = [(min_start, max_end), seq_range] -# issues.append(("pred_range_mismatch", short_data, [])) -# assert len(set(part_names)) == len(trunc_parts) -# assert sorted(set(source_predictions)) == sorted(pred_ranges) - -# # all files there? -# exp_files = [] -# for pred in af2_preds: -# exp_files.extend(_get_af2_files(pred["prediction_p"])) -# for part in trunc_parts: -# if part["truncated_prediction_path"] != "model": -# exp_files.append(part["truncated_prediction_path"]) -# acc_files = sorted(zip_dict["accompanying_data"].keys()) -# assert len(set(exp_files) - set(acc_files)) == 0 -# extra_files = set(acc_files) - set(exp_files) -# if len(extra_files) > 0: -# long_data = [sorted(exp_files), acc_files] -# issues.append(("extra_acc_files", sorted(extra_files), long_data)) - -# # check SW -# sw_checked = set() -# claimed_sw = json_dict["software_used_for_prediction"] -# for kw_to_check in sw_kws_to_check: -# matching_sw = [ -# sw for sw in claimed_sw if all(kw in sw for kw in kw_to_check) -# ] -# assert len(matching_sw) == 1 -# sw_checked.add(matching_sw[0]) -# assert sorted(sw_checked) == sorted(claimed_sw) -# if "software_used_for_domain_annotation" in json_dict: -# claimed_sw = json_dict["software_used_for_domain_annotation"] -# exp_sw = [ -# "DSSP 2.3.0 (Kabsch and Sander 1983)", -# "Foldseek (van Kempen et al. 2024)", -# "SamCC-Turbo (Szczepaniak et al. 2020)", -# "US-align (Zhang et al. 2022)", -# ] -# assert sorted(claimed_sw) == exp_sw - -# # QE checks -# plddts = json_dict["pLDDT"] -# plddt_range = (min(plddts), max(plddts)) -# if plddt_range[0] < 0 or plddt_range[1] > 100: -# issues.append(("plddt_range_mismatch", plddt_range, plddts)) -# if "MODELLER_energy_score" in json_dict: -# energy = json_dict["MODELLER_energy_score"] -# if min(energy) <= 0: -# issues.append(("energy_range_mismatch", min(energy), energy)) -# if "per_res_conf" in json_dict: -# per_res_confs = json_dict["per_res_conf"] -# prc_range = (min(per_res_confs), max(per_res_confs)) -# if prc_range[0] < 0 or prc_range[1] > 100: -# issues.append(("prc_range_mismatch", prc_range, per_res_confs)) -# if ( -# "MODELLER_energy_score" not in json_dict -# and "per_res_conf" in json_dict -# ): -# v1 = np.asarray(plddts) -# v2 = np.asarray(per_res_confs) -# qe_max_diff = np.max(abs(v1 - v2)) -# if qe_max_diff > 0.05: -# long_data = [plddts, per_res_confs] -# issues.append(("prc_plddt_diff", qe_max_diff, long_data)) - -# # check domains -# if mdl_type == "assembly": -# exp_num_kept_cif_cats = 3 -# else: -# exp_num_kept_cif_cats = 0 -# if exp_num_kept_cif_cats != len(kept_cif_cats): -# short_data = (exp_num_kept_cif_cats, sorted(kept_cif_cats.keys())) -# issues.append(("num_kept_cif_cats", short_data, kept_cif_cats)) -# # check categories -# if len(kept_cif_cats) == 3: -# # the following should all match after fixes appled in _load_cif_file -# pd_ids = sorted(kept_cif_cats["_pdbx_domain."]["id"]) -# if len(set(pd_ids)) != len(pd_ids): -# issues.append(("non_unique_pd_ids", [], pd_ids)) -# pfd_ids = kept_cif_cats["_pdbx_feature_domain."]["id"] -# if len(set(pfd_ids)) != len(pfd_ids): -# issues.append(("non_unique_pfd_ids", [], pfd_ids)) -# pfd_dom_ids = sorted( -# kept_cif_cats["_pdbx_feature_domain."]["domain_id"] -# ) -# if pd_ids != pfd_dom_ids: -# issues.append(("pfd_dom_ids", [], [pd_ids, pfd_dom_ids])) -# pdr_dom_ids = sorted( -# kept_cif_cats["_pdbx_domain_range."]["domain_id"]) -# if pd_ids != pdr_dom_ids: -# issues.append(("pdr_dom_ids", [], [pd_ids, pdr_dom_ids])) -# # special characters? -# for cat, cat_dict in kept_cif_cats.items(): -# for item, values in cat_dict.items(): -# for value in values: -# # I use the most permissive one -# if not re.fullmatch(mmcif_regex_desc, str(value)): -# invalid_chars = re.sub( -# mmcif_regex_desc, "", str(value) -# ) -# issues.append( -# ( -# "cif_invalid_chars", -# (f"{cat}{item}", invalid_chars), -# value, -# ) -# ) -# # matches the range? -# cat_dict = kept_cif_cats["_pdbx_domain_range."] -# res_list = list( -# zip( -# cat_dict["beg_label_asym_id"], -# cat_dict["beg_label_comp_id"], -# cat_dict["beg_label_seq_id"], -# ) -# ) + list( -# zip( -# cat_dict["end_label_asym_id"], -# cat_dict["end_label_comp_id"], -# cat_dict["end_label_seq_id"], -# ) -# ) -# for ch_name, res_name, res_num in res_list: -# res = cif_ent.FindResidue(ch_name, int(res_num)) -# if not res.IsValid() or not res.name == res_name: -# issues.append( -# ("invalid_dom_range_res", [ch_name, res_name, res_num], -# []) -# ) - -# return issues - - -# def _get_entities(json_dict, ncbi_data, cif_ent, mdl_type): -# """Gather data for the mmCIF (target) entities. -# Returns (list of target entities, list of issues) -# """ -# issues = [] -# # get NCBI data -# ncbi_ac = json_dict["source_sequence_RefSeq_ID"] -# ncbi_item = ncbi_data[ncbi_ac] -# ncbi_info = ncbi_item["info"] -# assert ncbi_ac == ncbi_info["AccessionVersion"] -# ncbi_seq = ncbi_item["seq_str"] -# # get json_dict data -# if mdl_type == "assembly": -# ent_desc = json_dict["source_sequence_description"] -# # full seq. -# seqres = json_dict["source_sequence"] -# ncbi_range = (1, len(ncbi_seq)) -# mdl_range = ncbi_range -# else: -# label = f'domain/motif "{json_dict["domain_name"]}"' -# ent_desc = f'{json_dict["source_sequence_description"]} ({label})' -# # cut seq. -# seqres = json_dict["domain_sequence"] -# ncbi_range = ( -# json_dict["domain_sequence_start"], -# json_dict["domain_sequence_end"], -# ) -# mdl_range = (1, len(seqres)) -# auth_seq_id_map = { -# pdb_idx: ncbi_idx -# for pdb_idx, ncbi_idx in zip( -# range(mdl_range[0], mdl_range[1] + 1), -# range(ncbi_range[0], ncbi_range[1] + 1), -# ) -# } -# # compare with NCBI -# ncbi_seq_cut = ncbi_seq[ncbi_range[0] - 1 : ncbi_range[1]] -# if seqres != ncbi_seq_cut: -# # one case exists with a single mismatch -# assert len(seqres) == len(ncbi_seq_cut) -# mismatches = [ -# (idx + 1, c1, c2) -# for idx, (c1, c2) in enumerate(zip(seqres, ncbi_seq_cut)) -# if c1 != c2 -# ] -# issues.append(("mismatches", mismatches, [seqres, ncbi_seq_cut])) -# else: -# mismatches = [] -# # fill data -# cif_ch_names, cif_mdlsqe, _, _ = _process_ent(cif_ent) -# entity = { -# "seqres": seqres, -# "ncbi_seq": ncbi_seq, -# "sqe_no_gaps": cif_mdlsqe, -# "mismatches": mismatches, -# "ncbi_range": ncbi_range, -# "mdl_range": mdl_range, -# "auth_seq_id_map": auth_seq_id_map, -# "pdb_chain_ids": cif_ch_names, -# "description": ent_desc, -# "ncbi_acv": ncbi_info["AccessionVersion"], -# "ncbi_gi": str(ncbi_info["Gi"]), -# "ncbi_taxid": str(ncbi_info["TaxId"]), -# "ncbi_organism": ncbi_info["SpeciesName"], -# "ncbi_last_mod": datetime.datetime.strptime( -# ncbi_info["UpdateDate"], "%Y/%m/%d" -# ), -# } -# return [entity], issues +def _read_csv(mdl_file, mdl_info_csv): + """Get info of a specific model from CSV file""" + mdl_file = os.path.basename(mdl_file) + + mdl_info = {} + with open(mdl_info_csv, newline="", encoding="ascii") as csvh: + info_r = csv.reader(csvh) + for row in info_r: + assert len(row) == 7 + if row[0].endswith(mdl_file): + # <UNP AC> + mdl_info["unp_ac"] = row[1] + # <UNP ALIGNMENT START> + mdl_info["unp_start"] = int(row[2]) + # <UNP ALIGNMENT END> + mdl_info["unp_end"] = int(row[3]) + # ?(IS THIS A UNP FIELD? MAYBE CODE?) + mdl_info["protein_desc"] = row[4] + # <MODEL TITLE> + mdl_info["entry_title"] = row[5] + # <MODEL DESCRIPTION> + mdl_info["entry_desc"] = row[6] + break + if len(mdl_info) == 0: + _abort_msg(f"Could not find '{mdl_file}' in '{mdl_info_csv}'.") + + return mdl_info ################################################################################ ################################################################################ -# ModelCIF HANDLING +# HANDLE FULL DATA SET ################################################################################ -# pylint: disable=too-few-public-methods -# class _GlobalPCR( -# modelcif.qa_metric.Global, modelcif.qa_metric.NormalizedScore -# ): -# """Percentage of confident (score > 70) residues in [0,1]""" - -# name = "percent confident residues" -# software = None - - -# class _GlobalPRC(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT): -# """Average per-residue confidence (pLDDT-like) in [0,1]""" - -# name = "average per-residue confidence (pLDDT-like)" -# software = None - - -# class _GlobalPRC_pLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT): -# """Average per-residue confidence (pLDDT) in [0,1]""" - -# name = "average per-residue confidence (pLDDT)" -# software = None - - -# class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT): -# """Predicted accuracy according to the CA-only lDDT in [0,100]""" - -# name = "pLDDT" -# software = None - - -# class _LocalMOD(modelcif.qa_metric.Local, modelcif.qa_metric.Energy): -# """Per-residue MODELLER energy score (>0, lower is better).""" - -# name = "MODELLER energy score" -# software = None - - -# class _LocalPRC(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT): -# """Per-residue confidence score in [0,100].""" - -# name = "confidence score (pLDDT-like)" -# software = None - - -# class _NcbiTrgRef(modelcif.reference.TargetReference): -# """NCBI as target reference.""" - -# name = "NCBI" -# other_details = None - - -# class _LPeptideAlphabetWithXO(ihm.LPeptideAlphabet): -# """Have the default amino acid alphabet plus 'X' for unknown residues -# and 'O' as allowed non-def. AA (U already in alphabet).""" - -# # extra entry added according to LPeptideAlphabet def. in -# # https://python-ihm.readthedocs.io/en/latest/_modules/ihm.html -# # and https://files.rcsb.org/view/1NTH.cif for values for 'O'. - -# def __init__(self): -# """Create the alphabet.""" -# super().__init__() -# self._comps["X"] = self._comps["UNK"] -# self._comps["O"] = ihm.LPeptideChemComp( -# "PYL", "O", "O", "PYRROLYSINE", "C12 H21 N3 O3" -# ) - - -# # pylint: enable=too-few-public-methods - - -# # TEST: _get_not_modeled_residues from modelcif/util/make_mmcif.py -# def _get_not_modeled_residues(model): -# """Yield NotModeledResidueRange objects for all residue ranges in the -# Model that are not referenced by Atom, Sphere, or pre-existing -# NotModeledResidueRange objects""" -# for assem in model.assembly: -# asym = assem.asym if hasattr(assem, "asym") else assem -# if not asym.entity.is_polymeric(): -# continue -# # Make a set of all residue indices of this asym "handled" either -# # by being modeled (with Atom or Sphere objects) or by being -# # explicitly marked as not-modeled -# handled_residues = set() -# for rr in model.not_modeled_residue_ranges: -# if rr.asym_unit is asym: -# for seq_id in range(rr.seq_id_begin, rr.seq_id_end + 1): -# handled_residues.add(seq_id) -# for atom in model.get_atoms(): -# if atom.asym_unit is asym: -# handled_residues.add(atom.seq_id) -# # Convert set to a list of residue ranges -# handled_residues = ihm.util._make_range_from_list( -# sorted(handled_residues) -# ) -# # Return not-modeled for each non-handled range -# for r in ihm.util._invert_ranges( -# handled_residues, -# end=assem.seq_id_range[1], -# start=assem.seq_id_range[0], -# ): -# yield modelcif.model.NotModeledResidueRange(asym, r[0], r[1]) - - -# class _OST2ModelCIF(modelcif.model.AbInitioModel): -# """Map OST entity elements to ihm.model""" - -# def __init__(self, *args, **kwargs): -# """Initialise a model""" -# # process kwargs -# for i in ["ost_entity", "asym", "scores_json"]: -# if i not in kwargs: -# raise TypeError(f"Required keyword argument '{i}' not found.") -# self.ost_entity = kwargs.pop("ost_entity") -# self.asym = kwargs.pop("asym") -# self.scores_json = kwargs.pop("scores_json") - -# # call parent init (needs to pop new kwargs first!) -# super().__init__(*args, **kwargs) - -# # use auth IDs for res. nums and chain names -# self.use_auth = False - -# # fetch b-factors for all residues (for get_atoms) -# self.res_bfactors = { -# r.qualified_name: self.scores_json["b_factors"][i] -# for i, r in enumerate(self.ost_entity.residues) -# } - -# # need reverse mapping from chain name + res num to seq. id -# self.seq_id_map = { -# ch_id: {v: k for k, v in asym_unit.auth_seq_id_map.items()} -# for ch_id, asym_unit in self.asym.items() -# } - -# # explicitly add parts which were not modelled -# for ch in self.ost_entity.chains: -# # get covered range -# asym_unit = self.asym[_get_ch_name(ch, self.use_auth)] -# res_nums = [res.number.num for res in ch.residues] -# res_first = min(res_nums) -# res_last = max(res_nums) -# # assertion true in this set (no gaps in modelled res. nums) -# assert res_nums == list(range(res_first, res_last + 1)) -# # compare with ent seq. -# mdl_seq_first = self.seq_id_map[ch.name][res_first] -# mdl_seq_last = self.seq_id_map[ch.name][res_last] -# ent_seq_first, ent_seq_last = asym_unit.seq_id_range -# if mdl_seq_first != ent_seq_first: -# assert mdl_seq_first > ent_seq_first -# self.not_modeled_residue_ranges.append( -# modelcif.model.NotModeledResidueRange( -# asym_unit, -# ent_seq_first, -# mdl_seq_first - 1, -# ) -# ) -# # NOTE: the case below doesn't actually happen in this set... -# if mdl_seq_last != ent_seq_last: -# assert mdl_seq_last < ent_seq_last -# self.not_modeled_residue_ranges.append( -# modelcif.model.NotModeledResidueRange( -# asym_unit, -# mdl_seq_last + 1, -# ent_seq_last, -# ) -# ) -# # -> note: could be auto-filled as in modelcif/util/make_mmcif.py -# # (see commented _get_not_modeled_residues TEST here) -# # -> see https://github.com/ihmwg/python-modelcif/issues/37 - -# def get_atoms(self): -# # ToDo [internal]: Take B-factor out since its not a B-factor? -# # NOTE: this assumes that _get_res_num maps residue to pos. in seqres -# # within asym -# for atm in self.ost_entity.atoms: -# yield modelcif.model.Atom( -# asym_unit=self.asym[_get_ch_name(atm.chain, self.use_auth)], -# seq_id=self.seq_id_map[atm.chain.name][atm.residue.number.num], -# atom_id=atm.name, -# type_symbol=atm.element, -# x=atm.pos[0], -# y=atm.pos[1], -# z=atm.pos[2], -# het=atm.is_hetatom, -# biso=self.res_bfactors[atm.residue.qualified_name], -# occupancy=atm.occupancy, -# ) - -# def add_scores(self): -# """Add QA metrics from AF2 scores.""" -# # global scores -# for score, score_class in [ -# ("percent_confident_residues", _GlobalPCR), -# ("mean_per_res_conf", _GlobalPRC), -# ("mean_per_res_conf_plddt", _GlobalPRC_pLDDT), -# ]: -# if score in self.scores_json: -# self.qa_metrics.append(score_class(self.scores_json[score])) - -# # local scores -# for score, score_class in [ -# ("pLDDT", _LocalPLDDT), -# ("MODELLER_energy_score", _LocalMOD), -# ("per_res_conf", _LocalPRC), -# ]: -# if score in self.scores_json: -# i = 0 -# for chn in self.ost_entity.chains: -# for res in chn.residues: -# seq_id = self.seq_id_map[chn.name][res.number.num] -# asym = self.asym[chn.name].residue(seq_id) -# self.qa_metrics.append( -# score_class(asym, self.scores_json[score][i]) -# ) -# i += 1 - - -# def _get_modelcif_entities(target_ents, asym_units, system): -# """Create ModelCIF entities and asymmetric units.""" -# alphabet = _LPeptideAlphabetWithXO() -# for cif_ent in target_ents: -# # collect references -# ncbi_ref = _NcbiTrgRef( -# code=cif_ent["ncbi_acv"], -# accession=cif_ent["ncbi_gi"], -# ncbi_taxonomy_id=cif_ent["ncbi_taxid"], -# organism_scientific=cif_ent["ncbi_organism"], -# sequence_version_date=cif_ent["ncbi_last_mod"], -# sequence=cif_ent["ncbi_seq"], -# ) -# # add alignment incl. mismatches -# ncbi_ref.alignments.append( -# modelcif.reference.Alignment( -# db_begin=cif_ent["ncbi_range"][0], -# db_end=cif_ent["ncbi_range"][1], -# entity_begin=cif_ent["mdl_range"][0], -# entity_end=cif_ent["mdl_range"][1], -# seq_dif=[ -# ihm.reference.SeqDif( -# seq_id, alphabet[olc_db], alphabet[olc] -# ) -# for seq_id, olc, olc_db in cif_ent["mismatches"] -# ], -# ) -# ) -# # -# references = [ncbi_ref] -# # combine into ModelCIF entity -# mdlcif_ent = modelcif.Entity( -# cif_ent["seqres"], -# description=cif_ent["description"], -# alphabet=alphabet, -# source=ihm.source.Natural( -# ncbi_taxonomy_id=cif_ent["ncbi_taxid"], -# scientific_name=cif_ent["ncbi_organism"], -# ), -# references=references, -# ) -# # NOTE: this assigns (potentially new) alphabetic chain names -# for pdb_chain_id in cif_ent["pdb_chain_ids"]: -# asym_units[pdb_chain_id] = modelcif.AsymUnit( -# mdlcif_ent, -# strand_id=pdb_chain_id, -# auth_seq_id_map=cif_ent["auth_seq_id_map"], -# ) -# system.entities.append(mdlcif_ent) - - -# def _get_assoc_af2_pdb_data(pred): -# """Generate a modelcif.data.Data object for an AF2 prediction.""" -# return modelcif.data.Data( -# name=f"AlphaFold-Multimer (v{pred['version']}) model for range " -# + f"{_get_af2_range_string(pred)}; pTM {pred['pTM']}, " -# + f"ipTM {pred['ipTM']}", -# ) - - -# def _get_assoc_af2_json_data(af2_path): -# """Generate a modelcif.data.Data object for JSON file -# with extra QE for PDB file in af2_path. -# """ -# return modelcif.data.Data( -# name=f"Detailed quality estimates (pLDDT, PAE) for {af2_path}", -# ) - - -# def _get_assoc_pae_png_data(af2_path): -# """Generate a modelcif.data.Data object for PNG file -# with PAE plot for PDB file in af2_path. -# """ -# return modelcif.data.Data( -# name=f"Plot showing PAE matrix for {af2_path}", -# ) - - -# def _get_assoc_trunc_pdb_data(part, is_single): -# """Generate a modelcif.associated.File object pointing to PDB file -# for truncated model an AF2 prediction. -# """ -# if is_single: -# mdl_txt = "model" -# else: -# mdl_txt = "and superposed model" -# return modelcif.data.Data( -# name=f"Truncated {mdl_txt} for range {part['part_start']}-" -# + f"{part['part_end']} derived from AlphaFold-Multimer model " -# + f"{part['source_prediction']}.pdb", -# ) - - -# def _get_associated_file( -# fle_path, data, file_format="other", file_content="other" -# ): -# """Generate a modelcif.associated.File object for given data.""" -# afile = modelcif.associated.File( -# fle_path, -# details=data.name, -# data=data, -# ) -# afile.file_format = file_format -# afile.file_content = file_content -# return afile - - -# def _get_associated_files(mdl_name, arc_files): -# """Create entry for associated files.""" -# # package all into zip file -# return modelcif.associated.Repository( -# "", -# [modelcif.associated.ZipFile(f"{mdl_name}.zip", files=arc_files)], -# ) -# # NOTE: by convention MA expects zip file with same name as model-cif - - -# global_ref_dbs = {} - - -# def _get_ref_db_object(name, url, version=None, release_date=None): -# """Cached access to modelcif.ReferenceDatabase objects. -# Needed to remove duplicates in ModelCIF. -# """ -# key = (name, url, version, release_date) -# if key not in global_ref_dbs: -# global_ref_dbs[key] = modelcif.ReferenceDatabase( -# name, url, version, release_date -# ) -# return global_ref_dbs[key] - - -# def _get_sequence_dbs(config_data): -# """Get AF seq. DBs.""" -# # hard coded UniProt release (see https://www.uniprot.org/release-notes) -# # (TO BE UPDATED FOR EVERY DEPOSITION!) -# pdb_rel_date = config_data["pdb_rel_date"] -# up_version = config_data["up_version"] -# up_version_dates = { -# "2021_03": datetime.datetime(2021, 6, 2), -# "2022_05": datetime.datetime(2022, 12, 14), -# } -# up_rel_date = up_version_dates[up_version] -# # fill list of DBs -# seq_dbs = [] -# if config_data["use_small_bfd"]: -# seq_dbs.append( -# _get_ref_db_object( -# "Reduced BFD", -# "https://storage.googleapis.com/alphafold-databases/" -# + "reduced_dbs/bfd-first_non_consensus_sequences.fasta.gz", -# ) -# ) -# else: -# seq_dbs.append( -# _get_ref_db_object( -# "BFD", -# "https://storage.googleapis.com/alphafold-databases/" -# + "casp14_versions/" -# + "bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt.tar.gz", -# version="6a634dc6eb105c2e9b4cba7bbae93412", -# ) -# ) -# if config_data["af_version"] < "2.3.0": -# seq_dbs.append( -# _get_ref_db_object( -# "MGnify", -# "https://storage.googleapis.com/alphafold-databases/" -# + "casp14_versions/mgy_clusters_2018_12.fa.gz", -# version="2018_12", -# release_date=datetime.datetime(2018, 12, 6), -# ) -# ) -# seq_dbs.append( -# _get_ref_db_object( -# "Uniclust30", -# "https://storage.googleapis.com/alphafold-databases/" -# + "casp14_versions/uniclust30_2018_08_hhsuite.tar.gz", -# version="2018_08", -# release_date=None, -# ) -# ) -# else: -# # NOTE: release date according to https://ftp.ebi.ac.uk/pub/... -# seq_dbs.append( -# _get_ref_db_object( -# "MGnify", -# "https://storage.googleapis.com/alphafold-databases/" -# + "v2.3/mgy_clusters_2022_05.fa.gz", -# version="2022_05", -# release_date=datetime.datetime(2022, 5, 6), -# ) -# ) -# seq_dbs.append( -# _get_ref_db_object( -# "UniRef30", -# "https://storage.googleapis.com/alphafold-databases/" -# + "v2.3/UniRef30_2021_03.tar.gz", -# version="2021_03", -# release_date=None, -# ) -# ) -# if config_data["use_multimer"]: -# seq_dbs.append( -# _get_ref_db_object( -# "TrEMBL", -# "ftp://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/" -# + "knowledgebase/complete/uniprot_trembl.fasta.gz", -# version=up_version, -# release_date=up_rel_date, -# ) -# ) -# seq_dbs.append( -# _get_ref_db_object( -# "Swiss-Prot", -# "ftp://ftp.ebi.ac.uk/pub/databases/uniprot/current_release/" -# + "knowledgebase/complete/uniprot_sprot.fasta.gz", -# version=up_version, -# release_date=up_rel_date, -# ) -# ) -# seq_dbs.append( -# _get_ref_db_object( -# "UniRef90", -# "ftp://ftp.uniprot.org/pub/databases/uniprot/uniref/uniref90/" -# + "uniref90.fasta.gz", -# version=up_version, -# release_date=up_rel_date, -# ) -# ) -# if config_data["use_templates"]: -# if config_data["use_multimer"]: -# # uses whatever is latest set of PDB sequences -# # see AF2 scripts/download_pdb_seqres.sh -# seq_dbs.append( -# _get_ref_db_object( -# "PDB seqres", -# "https://ftp.wwpdb.org/pub/pdb/derived_data/pdb_seqres.txt", -# release_date=pdb_rel_date, -# ) -# ) -# else: -# # fixed version used in AF2 scripts/download_pdb70.sh -# seq_dbs.append( -# _get_ref_db_object( -# "PDB70", -# "http://wwwuser.gwdg.de/~compbiol/data/hhsuite/databases/" -# + "hhsuite_dbs/old-releases/pdb70_from_mmcif_200401.tar.gz", -# release_date=datetime.datetime(2020, 4, 1), -# ) -# ) -# return seq_dbs - - -# def _assemble_modelcif_software(soft_dict, params_dict): -# """Create a modelcif.SoftwareWithParameters instance from dictionaries.""" -# # create SW object -# sw = modelcif.Software( -# soft_dict["name"], -# soft_dict["classification"], -# soft_dict["description"], -# soft_dict["location"], -# soft_dict["type"], -# soft_dict["version"], -# citation=soft_dict["citation"], -# ) -# # assemble parameters -# params = [] -# for key, val in params_dict.items(): -# params.append(modelcif.SoftwareParameter(key, val)) -# # put them together -# return modelcif.SoftwareWithParameters(sw, params) - - -# def _get_modelcif_protocol_software(js_step): -# """Assemble software entries for a ModelCIF protocol step.""" -# # new setup in python-modelcif (as of late 2023): params with each SW -# sw_list = [] -# for sw, sw_params in js_step["software_plus_params"]: -# sw_list.append(_assemble_modelcif_software(sw, sw_params)) -# # group and done... -# if sw_list: -# return modelcif.SoftwareGroup(sw_list) -# else: -# return None - - -# data_group_cache = {} - - -# def _get_modelcif_protocol_data(data_labels, target_entities, model, -# acc_data): -# """Assemble data for a ModelCIF protocol step. -# Cached access to objects needed to remove duplicates in ModelCIF. -# """ -# cache_key = "|".join(sorted(data_labels)) -# if cache_key in data_group_cache: -# return data_group_cache[cache_key] -# data = modelcif.data.DataGroup() -# for data_label in data_labels: -# if data_label.startswith("target_sequences_"): -# data.extend(target_entities) -# elif data_label == "ref_dbs_AF2_2.1.2": -# # HC for that version -# data.extend( -# _get_sequence_dbs( -# { -# "pdb_rel_date": datetime.datetime(2021, 11, 5), -# "up_version": "2021_03", -# "use_small_bfd": True, -# "af_version": "2.1.2", -# "use_multimer": True, -# "use_templates": True, -# } -# ) -# ) -# elif data_label == "ref_dbs_AF2_2.3.1": -# # HC for that version -# data.extend( -# _get_sequence_dbs( -# { -# "pdb_rel_date": datetime.datetime(2022, 12, 9), -# "up_version": "2022_05", -# "use_small_bfd": True, -# "af_version": "2.3.1", -# "use_multimer": True, -# "use_templates": True, -# } -# ) -# ) -# elif data_label.startswith("model_"): -# data.append(model) -# elif data_label in acc_data: -# data.append(acc_data[data_label]) -# else: -# raise RuntimeError(f"Unknown protocol data: '{data_label}'") -# data_group_cache[cache_key] = data -# return data - - -# def _get_modelcif_protocol(protocol_steps, target_entities, model, acc_data): -# """Create the protocol for the ModelCIF file.""" -# protocol = modelcif.protocol.Protocol() -# for js_step in protocol_steps: -# sftwre = _get_modelcif_protocol_software(js_step) -# input_data = _get_modelcif_protocol_data( -# js_step["input"], target_entities, model, acc_data -# ) -# output_data = _get_modelcif_protocol_data( -# js_step["output"], target_entities, model, acc_data -# ) - -# protocol.steps.append( -# modelcif.protocol.Step( -# input_data=input_data, -# output_data=output_data, -# name=js_step["name"], -# details=js_step["details"], -# software=sftwre, -# ) -# ) -# protocol.steps[-1].method_type = js_step["method_type"] -# return protocol - - -# def _compress_cif_file(cif_file): -# """Compress cif file and delete original.""" -# with open(cif_file, "rb") as f_in: -# with gzip.open(cif_file + ".gz", "wb") as f_out: -# shutil.copyfileobj(f_in, f_out) -# os.remove(cif_file) - - -# def _package_associated_files(repo, dict_data): -# """Compress associated files into single zip file and delete original. -# Data can either be on disk or available in dict_data (key = path). -# """ -# # zip settings tested for good speed vs compression -# for archive in repo.files: -# with zipfile.ZipFile(archive.path, "w", zipfile.ZIP_BZIP2) as cif_zip: -# for zfile in archive.files: -# zpath = zfile.path -# if zpath in dict_data: -# cif_zip.writestr(zpath, dict_data[zpath]) -# else: -# cif_zip.write(zpath, arcname=zpath) -# os.remove(zpath) - - -# def _pkl_to_json(pkl_data, pae_handling="keep"): -# """Make pkl_data JSON writable. -# Options for pae_handling: -# - keep = values kept as they are -# - round = values kept as closest integers -# - drop = drop whole table -# """ -# pkl_json_data = {} -# for k, v in pkl_data.items(): -# if k == "PAE": -# if pae_handling == "keep": -# pkl_json_data["PAE"] = pkl_data["PAE"].tolist() -# elif pae_handling == "round": -# pae = pkl_data["PAE"].round().astype(int).tolist() -# pkl_json_data["PAE"] = pae -# elif pae_handling == "drop": -# pass -# else: -# raise RuntimeError( -# f"Unknown pae_handling value {pae_handling}" -# ) -# elif k != "pLDDT_chains": -# if type(v) == np.ndarray: -# # note: scalars become scalars! -# v = v.tolist() -# pkl_json_data[k] = v -# return pkl_json_data - - -# def _get_sw_for_qe(steps, step_name): -# """Fetch suitable SW objects from protocol steps to use in QE.""" -# # to maximally reduce duplicates we reuse single groups -# # otherwise new group created using same SoftwareWithParameters objects -# sw_groups = [step.software for step in steps if step.name == step_name] -# if len(sw_groups) == 0: -# return None -# elif len(sw_groups) == 1: -# return sw_groups[0] -# else: -# # each sw_group is a list (SoftwareGroup) of SoftwareWithParameters -# # ...and we remove duplicates...just in case -# sw_dict = {} -# for sw_group in sw_groups: -# sw_dict.update({hash(swp): swp for swp in sw_group}) -# return modelcif.SoftwareGroup(sw_dict.values()) - - -# def _store_as_modelcif( -# data_json, -# ost_ent, -# zip_dict, -# kept_cif_cats, -# out_dir, -# mdl_name, -# compress, -# add_extra_files, -# ): -# """Mix all the data into a ModelCIF file.""" -# print(" generating ModelCIF objects...", end="") -# pstart = timer() -# # create system to gather all the data -# system = modelcif.System( -# title=data_json["title"], -# id=data_json["mdl_id"].upper(), -# model_details=data_json["model_details"], -# ) - -# # create an asymmetric unit and an entity per target sequence -# asym_units = {} -# _get_modelcif_entities(data_json["target_entities"], asym_units, system) - -# # audit_authors -# system.authors.extend(data_json["audit_authors"]) - -# # set up the model to produce coordinates -# model = _OST2ModelCIF( -# assembly=modelcif.Assembly(asym_units.values()), -# asym=asym_units, -# ost_entity=ost_ent, -# scores_json=data_json["scores"], -# name=None, -# ) -# # TEST -# # model.not_modeled_residue_ranges.extend( -# # _get_not_modeled_residues(model)) -# # -# print(f" ({timer()-pstart:.2f}s)") -# print(" processing QA scores...", end="", flush=True) -# pstart = timer() -# model.add_scores() -# print(f" ({timer()-pstart:.2f}s)") - -# model_group = modelcif.model.ModelGroup([model]) -# system.model_groups.append(model_group) - -# # handle additional files -# acc_data = {} -# acc_files = [] -# dict_data = {} -# for pred in data_json["af2_preds"]: -# # get data objects -# pdb_file, pkl_file, pae_file = _get_af2_files(pred["prediction_p"]) -# acc_data[pdb_file] = _get_assoc_af2_pdb_data(pred) -# # NOTE: we keep data object for mismatching PKL file even though we -# # do not dump file itself -# acc_data[pkl_file] = _get_assoc_af2_json_data(pdb_file) -# acc_data[pae_file] = _get_assoc_pae_png_data(pdb_file) -# if add_extra_files: -# # get file objects -# acc_files.append( -# _get_associated_file(pdb_file, acc_data[pdb_file]) -# ) -# if pkl_file not in data_json["pkl_files_to_skip"]: -# json_file = f"{os.path.splitext(pkl_file)[0]}.json" -# acc_files.append( -# _get_associated_file( -# json_file, acc_data[pkl_file], file_format="json" -# ) -# ) -# acc_files.append( -# _get_associated_file(pae_file, acc_data[pae_file]) -# ) -# # get file content -# dict_data[pdb_file] = zip_dict["accompanying_data"][pdb_file] -# if pkl_file not in data_json["pkl_files_to_skip"]: -# pkl_data = pickle.loads( -# zip_dict["accompanying_data"][pkl_file] -# ) -# pkl_json_data = _pkl_to_json(pkl_data, pae_handling="round") -# dict_data[json_file] = json.dumps(pkl_json_data) -# dict_data[pae_file] = zip_dict["accompanying_data"][pae_file] -# for part in data_json["trunc_parts"]: -# part_path = part["truncated_prediction_path"] -# if part_path != "model": -# acc_data[part_path] = _get_assoc_trunc_pdb_data( -# part, len(data_json["trunc_parts"]) == 1 -# ) -# if add_extra_files: -# acc_files.append( -# _get_associated_file(part_path, acc_data[part_path]) -# ) -# dict_data[part_path] = \ -# zip_dict["accompanying_data"][part_path] -# if acc_files: -# system.repositories.append(_get_associated_files(mdl_name, acc_files)) - -# # get data and steps -# protocol = _get_modelcif_protocol( -# data_json["protocol"], -# system.entities, -# model, -# acc_data, -# ) -# system.protocols.append(protocol) - -# # set SW for QE -# _LocalPLDDT.software = _get_sw_for_qe(protocol.steps, "AF2") -# _GlobalPRC_pLDDT.software = _LocalPLDDT.software -# _LocalMOD.software = _get_sw_for_qe(protocol.steps, "MODELLER") - -# # write modelcif System to file -# print(" write to disk...", end="", flush=True) -# pstart = timer() -# # NOTE: this will dump PAE on path provided in add_scores -# # -> hence we cheat by changing path and back while being ... -# oldpwd = os.getcwd() -# os.chdir(out_dir) -# mdl_fle = f"{mdl_name}.cif" -# try: -# # dump to string and fix with gemmi -# with StringIO() as mmcif_fh: -# modelcif.dumper.write(mmcif_fh, [system]) -# modelcif_str = mmcif_fh.getvalue() -# doc = gemmi.cif.read_string(modelcif_str) -# block = doc.sole_block() -# # HACK: set all label_alt_id to '-' -# # -> see https://github.com/wwpdb-dictionaries/mmcif_pdbx/issues/60 -# if kept_cif_cats: -# cat_dict = block.get_mmcif_category("_atom_site.") -# cat_dict["label_alt_id"] = ["-"] * len(cat_dict["label_alt_id"]) -# block.set_mmcif_category("_atom_site.", cat_dict) -# cat_dict = kept_cif_cats["_pdbx_domain_range."] -# new_alt_ids = ["-"] * len(cat_dict["beg_label_alt_id"]) -# cat_dict["beg_label_alt_id"] = new_alt_ids -# new_alt_ids = ["-"] * len(cat_dict["end_label_alt_id"]) -# cat_dict["end_label_alt_id"] = new_alt_ids -# # -# for cat, cat_dict in kept_cif_cats.items(): -# block.set_mmcif_category(cat, cat_dict) -# doc.write_file(mdl_fle) -# # extra files -# with open(f"{mdl_name}-image.png", "wb") as fh: -# fh.write(zip_dict["image_file"]) -# if acc_files: -# _package_associated_files(system.repositories[0], dict_data) -# if compress: -# _compress_cif_file(mdl_fle) -# mdl_fle += ".gz" -# finally: -# os.chdir(oldpwd) -# print(f" ({timer()-pstart:.2f}s)") - +def _update_modelcif(mdl_file, mdl_info, unp_json_file, out_dir, compress): + """Update ModelCIF file with model info and verify UNP related data. + Caution: This is for updates BEFORE deposition, this update does not do + a CIF-style revision history for you!""" + + block = _MABlock(mdl_file) + assert len(block.polymer_targets) == 1 + target = block.polymer_targets[0] + unp = UniProtKBEntryCache(unp_json_file) + # print(mdl_info) + unp, db_aln, seq_aln = unp.match_sequence( + mdl_info["unp_ac"], + target["sequence"], + mdl_info["unp_start"], + mdl_info["unp_end"], + ) -################################################################################ + # update ModelCIF data + block.add_to_category( + "struct", + title=mdl_info["entry_title"], + pdbx_model_details=mdl_info["entry_desc"], + ) + block.add_to_category( + "entity", + pdbx_description=mdl_info["protein_desc"], + match=("id", target["entity_id"]), + ) + # Update ModelCIF files with UniProtKB info + struct_ref_id = "1" + block.add_category( + "struct_ref", + id=[struct_ref_id], + entity_id=[target["entity_id"]], + db_name=["UNP"], + db_code=[unp.unp_id], + pdbx_db_accession=[unp.unp_ac], + pdbx_align_begin=[mdl_info["unp_start"]], + pdbx_seq_one_letter_code=[unp.unp_seq], + details=[unp.unp_details_full], + after="entity", + ) + block.add_category( + "struct_ref_seq", + align_id=["1"], + ref_id=[struct_ref_id], + seq_align_beg=[seq_aln[0]], + seq_align_end=[seq_aln[1]], + db_align_beg=[db_aln[0]], + db_align_end=[db_aln[1]], + after="struct_ref", + ) + block.add_category( + "ma_target_ref_db_details", + target_entity_id=[target["entity_id"]], + db_name=["UNP"], + db_name_other_details=[False], + db_code=[unp.unp_id], + db_accession=[unp.unp_ac], + seq_db_isoform=[False], + seq_db_align_begin=[db_aln[0]], + seq_db_align_end=[db_aln[1]], + ncbi_taxonomy_id=[unp.ncbi_taxid], + organism_scientific=[unp.organism_species], + seq_db_sequence_version_date=[date.isoformat(unp.last_seq_change)], + seq_db_sequence_checksum=[unp.unp_crc64], + after="struct_ref_seq", + ) + # write, if requested write compressed + if out_dir is not None: + block.write_file( + os.path.join(out_dir, os.path.basename(mdl_file)), compress + ) ################################################################################ -# HANDLE FULL DATA SET +# FUNCTIONALITY THAT MAY MOVE INTO ITS OWN MODULE ################################################################################ -# def _translate2modelcif(ncbi_data, opts): -# """Convert a model with its accompanying data to ModelCIF.""" -# # get main names -# mdl_id = os.path.splitext(os.path.basename(opts.input_zip_path))[0] -# if opts.compress: -# cifext = "cif.gz" -# else: -# cifext = "cif" -# mdl_path = os.path.join(opts.out_dir, f"{mdl_id}.{cifext}") -# # skip if done already (disabled here) -# # if os.path.exists(mdl_path): -# # print(f" {mdl_id} already done...") - -# # prepare data for model to convert (also gets all issues) -# issues = [] -# abort_after_checks = opts.checks_only -# zip_dict = _get_zip_content(opts.input_zip_path) -# mdl_type = zip_dict["mdl_type"] -# if zip_dict["unparsed_files"]: -# file_list = zip_dict["unparsed_files"] -# issues.append(("unparsed_files", len(file_list), file_list)) -# json_dict = json.loads(zip_dict["json_file"]) -# # apply hard-coded fixes for some models -# if mdl_id == "ma-taas-2330": -# # find entry to fix -# for pred in json_dict["AF_predictions"]: -# if pred["prediction_p"] == "AFv2.3.1_389-515_mergeN_21.pdb": -# pred["prediction_p"] = "AFv2.3.1_389-515.pdb" -# del pred["merged_GCN4linker_len_N"] -# del pred["merged_GCN4linker_len_C"] -# elif mdl_id == "ma-taas-9424": -# # len(json_dict["pLDDT"]) was 53 instead of 52 -# # -> also in wrong abstract... -# json_dict["pLDDT"] = json_dict["pLDDT"][:52] -# json_dict["abstract"] = json_dict["abstract"].replace( -# "53 residues", "52 residues" -# ) -# cif_ent, kept_cif_cats = _load_cif_file(zip_dict) -# try: -# issues.extend( -# _do_sanity_checks( -# mdl_id, json_dict, zip_dict, cif_ent, kept_cif_cats -# ) -# ) -# entities, ent_issues = _get_entities( -# json_dict, ncbi_data, cif_ent, mdl_type -# ) -# issues.extend(ent_issues) -# except Exception as ex: -# short_txt = f"{type(ex).__name__}: {ex}" -# long_txt = traceback.format_exc() -# issues.append(("exception", short_txt, long_txt)) -# abort_after_checks = True - -# # dump issues -# issues_file_path = os.path.join(opts.out_dir, f"{mdl_id}-issues.json") -# json.dump(issues, open(issues_file_path, "w")) -# # abort if needed -# if abort_after_checks: -# return -# # do the actual conversion -# print(f" translating {mdl_id}...") -# pdb_start = timer() +class _MABlock: + """gemmi.cif wrapper that skips reading/ documents and jumps right into + gemmi.cif.Block objects. You have all the gemmi.cif.Block functionality + available plus our own convenience functions on top.""" + + def __init__(self, model_data): + """Initialise a single Block from a file.""" + self.source = model_data + self.doc = cif.read(model_data) + self.block = self.doc.sole_block() + + self._targets = None + self._polymer_targets = None + + def __getattr__(self, name): + """If an attribute is not found, try self.block before exception.""" + # The catch here is: when asking for self.foo, + # self.__getattribute__(self, "foo") is executed first. If "foo" is + # not found in self, self.__getattr__(self, "foo") is called. So here + # we already know, that "foo" is not there and we can check the + # original block. + if hasattr(self.block, name): + return getattr(self.block, name) + raise AttributeError( + f"'{type(self).__name__}' object has no attribute '{name}'" + ) + + @property + def targets(self): + """Info about targets.""" + if self._targets is not None: + return self._targets + self._targets = {} + table = self.find("_ma_target_entity.", ["entity_id"]) + for row in table: + if row["entity_id"] in self._targets: + raise RuntimeError( + f"Target with entity_id '{row['entity_id']}' is duplicated." + ) + self._targets[row["entity_id"]] = { + "entity_id": row["entity_id"], # makes live easier for singles + "sequence": self.get_sequence(row["entity_id"]), + } + table = self.find("_entity.", ["id", "type"]) + for row in table: + self._targets[row["id"]]["type"] = row["type"] + + return self._targets + + @property + def polymer_targets(self): + """Only targets of entity type 'polymer'.""" + if self._polymer_targets is not None: + return self._polymer_targets + self._polymer_targets = [] + for target in self.targets.values(): + if target["type"] == "polymer": + self._polymer_targets.append(target) + + return self._polymer_targets + + def find(self, name, columns): + """Get a table with defined colums. Throws an exception if table is not + found.""" + table = self.block.find(name, columns) + if len(table) == 0: + raise RuntimeError( + f"""Table '{name}' with columns '{"', '".join(columns)}' """ + + "not found." + ) + + return table + + def get_sequence(self, entity): + """Get the sequence of a 'polymer' entity. `entity` is the numeric ID + of the entity. + Reading sequences is inefficient atm, it reads the whole table for + every sequence w/o any caching.""" + table = self.find("_entity_poly_seq.", ["entity_id", "num", "mon_id"]) + sequence = "" + num = 0 + for row in table: + if row["entity_id"] != entity: + continue + num += 1 + assert int(row["num"]) == num + sequence += gemmi.find_tabulated_residue( + row["mon_id"] + ).one_letter_code + + return sequence + + def write_file(self, filename, compress=False, style=cif.Style.Simple): + """Write ModelCIF file to disk, compress upon request. + Will compress anyways if file ends with '.gz'.""" + if compress or filename.endswith(".gz"): + if not filename.endswith(".gz"): + filename += ".gz" + with gzip.open(filename, mode="wt", compresslevel=9) as gfh: + gfh.write(self.doc.as_string(style)) + else: + self.doc.write_file(filename, style) + + def add_to_category(self, category, match=None, **kwargs): + """Add item values to a category. + Keyword arguments are reserved for item names.""" + if category[0] != "_": + category = "_" + category + if category[-1] != ".": + category += "." + items = list(kwargs.keys()) + row = None + if match is not None: + table = self.find(category, items + [match[0]]) + for row in table: + if row[match[0]] == match[1]: + break + if row is None: + raise RuntimeError( + f"No item {match[0]}=={match[1]} found in category " + + f"{category}." + ) + else: + table = self.find(category, items) + assert len(table) == 1 + row = table[0] + for itm, val in kwargs.items(): + if row[itm] not in [".", "?"]: + print( + f" replacing '{cif.as_string(row[itm])}' with " + + f"'{val}' ({itm})" + ) + # ToDo: handle quoting the same way as for add_category + row[itm] = cif.quote(val) if " " in val else val + + def add_category(self, category, after=None, **kwargs): + """Add a new category to the block with only 1 set of values, 1 row + thinking of categories as tables. kwargs are reserved for item/ value + pairs. after is a special keyword parameter to locate the new category + inside the block.""" + if not category.startswith("_"): + category = "_" + category + # handle quoting + for values in kwargs.values(): + for i, val in enumerate(values): + if ( + isinstance(val, str) + and " " in val + and not (val.startswith("'") and val.endswith("'")) + and not (val.startswith('"') and val.endswith('"')) + ): + values[i] = cif.quote(val) + self.block.set_mmcif_category(category, kwargs, raw=True) + + if after is None: + return + if not after.startswith("_"): + after = "_" + after + if not after.endswith("."): + after += "." + table = self.block.find_mmcif_category(after) + idx = self.block.get_index(table.tags[-1]) + # be careful with move_item: loops are 1 item and move as a whole, + # single line values move per item/ value par. + self.block.move_item(-1, idx + 1) -# # gather data into JSON-like structure -# print(" preparing data...", end="") -# pstart = timer() -# af2_preds, trunc_parts = _get_preds_and_parts(json_dict, mdl_type) -# mdlcf_json = { -# "title": json_dict["title"].strip(), -# "model_details": json_dict["abstract"].strip(), -# "audit_authors": _get_audit_authors(), -# "af2_preds": af2_preds, -# "trunc_parts": trunc_parts, -# "protocol": _get_protocol_steps_and_software( -# json_dict, mdl_type, mdl_id -# ), -# "mdl_id": mdl_id, # used for entry ID -# "target_entities": entities, -# "scores": { -# score: json_dict[score] -# for score in [ -# "pLDDT", -# "MODELLER_energy_score", -# "per_res_conf", -# "percent_confident_residues", -# "mean_per_res_conf", -# ] -# if score in json_dict -# }, -# "pkl_files_to_skip": [], -# } -# # fix percentage to [0,1] -# mdlcf_json["scores"]["percent_confident_residues"] /= 100.0 -# # enlarge local scores to be for each residue -# for score in ["pLDDT", "MODELLER_energy_score", "per_res_conf"]: -# if score in mdlcf_json["scores"]: -# mdlcf_json["scores"][score] *= cif_ent.chain_count -# # note: we overwrite b-factors from file! -# # -> also choose which global score to use for mean_per_res_conf -# if "per_res_conf" in mdlcf_json["scores"]: -# mdlcf_json["scores"]["b_factors"] = mdlcf_json["scores"][ -# "per_res_conf" -# ] -# else: -# mdlcf_json["scores"]["b_factors"] = mdlcf_json["scores"]["pLDDT"] -# mdlcf_json["scores"]["mean_per_res_conf_plddt"] = mdlcf_json[ -# "scores" -# ].pop("mean_per_res_conf") -# # check for inconsistent pkl files -# for issue_type, short_data, long_data in issues: -# if issue_type.startswith("pkl_") and issue_type.endswith("_diff"): -# mdlcf_json["pkl_files_to_skip"].append(short_data[1]) -# print(f" ({timer()-pstart:.2f}s)") -# # save ModelCIF -# _store_as_modelcif( -# data_json=mdlcf_json, -# ost_ent=cif_ent, -# zip_dict=zip_dict, -# kept_cif_cats=kept_cif_cats, -# out_dir=opts.out_dir, -# mdl_name=mdl_id, -# compress=opts.compress, -# add_extra_files=(not opts.no_extra_files), -# ) - -# # check if result can be read and has expected seq. -# ent, ss = io.LoadMMCIF(mdl_path, seqres=True) -# ent_seqres = [ss.FindSequence(chn.name).string for chn in ent.chains] -# exp_seqres = [] -# for trg_ent in mdlcf_json["target_entities"]: -# exp_seqres += [trg_ent["seqres"]] * len(trg_ent["pdb_chain_ids"]) -# assert ent_seqres == exp_seqres, f"Bad seqres {mdl_id}" -# # check model itself -# ch_names, mdlsqe, resnum_range, ent_bfs = _process_ent(ent) -# assert ch_names == ["A", "B", "C"] -# target_entity = mdlcf_json["target_entities"][0] -# assert mdlsqe == target_entity["sqe_no_gaps"] -# assert max(resnum_range) <= len(target_entity["seqres"]) -# if mdl_type == "assembly": -# seq_range = ( -# json_dict["predicted_sequence_start"], -# json_dict["predicted_sequence_end"], -# ) -# assert resnum_range == seq_range -# else: -# assert resnum_range == (1, len(mdlsqe)) -# exp_bfs = mdlcf_json["scores"]["b_factors"] -# bfs_max_diff = np.max(abs(np.asarray(ent_bfs) - np.asarray(exp_bfs))) -# assert bfs_max_diff < 0.01 - -# print(f" ... done with {mdl_id} ({timer()-pdb_start:.2f}s).") +################################################################################ def _main(): """Run as script.""" - # 1. Update ModelCIF files with UniProtKB AC incl. range, entity - # description text, entry title, entry description from CSV file - # 2. Validate sequence compared to UniProt (incl. checking/fetching seq. - # version as done in SCHWED-6162 for Ian's set) opts = _parse_args() - # handle single entry (print statements kept consistent with other sets) - print("Working on models...") - - print("... done with models.") + # read model info from CSV + mdl_info = _read_csv(opts.modelcif, opts.model_info_csv) + print(f"Working on model {opts.modelcif}...") + _update_modelcif( + opts.modelcif, + mdl_info, + opts.unp_json, + opts.out_dir if not opts.checks_only else None, + opts.compress, + ) + print(f"... done with model {opts.modelcif}.") # TEST: to judge res. needed on cluster # import resource @@ -2522,3 +451,5 @@ def _main(): if __name__ == "__main__": _main() + +# LocalWords: gemmi diff --git a/projects/2024-12-ma-denv/uniprotkb.py b/projects/2024-12-ma-denv/uniprotkb.py new file mode 100644 index 0000000000000000000000000000000000000000..d934eabc278c83018fa84b9d689e2dc0342c4d4b --- /dev/null +++ b/projects/2024-12-ma-denv/uniprotkb.py @@ -0,0 +1,495 @@ +"""Functions to retrieve data for UniProtKB entries. +""" + +# Copyright (c) 2023, SIB - Swiss Institute of Bioinformatics and +# Biozentrum - University of Basel +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from datetime import datetime +import os +import requests + +try: + # pylint: disable="bare-except" + import ujson as json +except: + import json + +import parasail + +# Name of UniProtKB as _ma_target_ref_db_details.db_name in a ModelCIF file, +# but 'pythonified' (no leading '_', no '.'). +MA_TARGET_REF_DB_DETAILS_DB_NAME = "UNP" + + +def translate_upkb_date_string(date_string): + """UniProtKB uses 3-letter month codes, which do not fly in other languages + (locales) than English, e.g. if the locale is set to de_DE, strptime() + expects 'MAI' instead of 'MAY'... switching the locale temporarily is also + not so easy (threads, not setting it back upon exceptions...). Hence, we + make the month a numeric value, here. + """ + for i, mon in enumerate( + [ + "JAN", + "FEB", + "MAR", + "APR", + "MAY", + "JUN", + "JUL", + "AUG", + "SEP", + "OCT", + "NOV", + "DEC", + ], + start=1, + ): + if mon in date_string: + return date_string.replace(mon, f"{i:02}") + + raise RuntimeError( + "Unrecognised UniProtKB date string found: " + f"'{date_string}'." + ) + + +class UniProtKBEntry: + """Deal with UniProtKB entries.""" + + # We ignore PEP8 on the number of attributes per class, here. The objective + # of UniProtKBEntry is to... represent a UniProtKB entry with all its + # data/ attributes. By the nature of an entry's meta-data, breaking it up + # into sub-classes seems counter-intuitive. + # Since this is more a data class than a functional class, we allow having + # too few public methods for now. + # pylint: disable=too-many-instance-attributes, too-few-public-methods + + def __init__(self, unp_ac, entry_version=None, json_data=None): + """Create a new UniProtKB entry object. + + UniProtKB API will be queried immediately on creation. + + :param unp_ac: Accession code of the UniProtKB entry to be fetched. + :type: unp_ac: :class:`str` + :param entry_version: Version of the UniPrtoKB entry to be fetched (not + to be mixed up with the UniProtKB release). + :type entry_version: :class:`str` or :class:`int` + :param json_data: Init object from JSON object, ignores unp_ac. + :type json_data: :class:`dict` + """ + if json_data is None: + self.unp_ac = unp_ac + # filled by self._fetch() + self.entry_status = None + self.entry_version = ( + int(entry_version) if entry_version is not None else None + ) + self.first_appearance = None + self.last_change = None + self.last_seq_change = None + self.ncbi_taxid = None + self.organism_species = "" + self.seq_version = None + self.seqlen = None + self.unp_crc64 = None + self.unp_details_full = None + self.unp_id = None + self.unp_seq = "" + self._fetch() + assert len(self.unp_seq) == self.seqlen + else: + self.entry_status = json_data["entry_status"] + self.entry_version = int(json_data["entry_version"]) + self.first_appearance = ( + datetime.fromisoformat(json_data["first_appearance"]) + if json_data["first_appearance"] is not None + else None + ) + self.last_change = ( + datetime.fromisoformat(json_data["last_change"]) + if json_data["last_change"] is not None + else None + ) + self.last_seq_change = ( + datetime.fromisoformat(json_data["last_seq_change"]) + if json_data["last_seq_change"] is not None + else None + ) + self.ncbi_taxid = json_data["ncbi_taxid"] + self.organism_species = json_data["organism_species"] + self.seq_version = json_data["seq_version"] + self.seqlen = json_data["seqlen"] + self.unp_ac = json_data["ac"] + self.unp_crc64 = json_data["crc64"] + self.unp_details_full = json_data["details_full"] + self.unp_id = json_data["id"] + self.unp_seq = json_data["seq"] + + def __str__(self): + """Print object as string.""" + return ( + f"<{__name__}.{type(self).__name__} AC={self.unp_ac} " + + f"version={self.entry_version}>" + ) + + def _parse_id_line(self, line): + """Parse a UniProtKB TXT format's ID line. + + Should support some older format versions, too.""" + sline = line.split() + if len(sline) != 5: + # Some old formats that are easy to fix + if len(sline) == 6 and sline[3] == "PRT;": + sline.pop(3) + else: + raise RuntimeError( + "ID line not conforming to 'ID EntryName" + + f"Status; SequenceLength.', found: {line}" + ) + self.unp_id = sline[1] + self.entry_status = sline[2][:-1].upper() + self.seqlen = int(sline[3]) + + def _parse_dt_line(self, line): + """Parse a UniProtKB TXT format's DT line. + + Should support some older format versions, too.""" + sline = line.split() + sline[1] = translate_upkb_date_string(sline[1]) + if sline[2] == "(Rel.": # old format + if sline[4] == "Created)": + self.first_appearance = datetime.strptime(sline[1], "%d-%m-%Y") + self.entry_version = int(sline[3][:-1]) + elif sline[5] == "sequence": + self.last_seq_change = datetime.strptime(sline[1], "%d-%m-%Y") + self.seq_version = int(sline[3][:-1]) + elif sline[5] == "annotation": + self.last_change = datetime.strptime(sline[1], "%d-%m-%Y") + return + if sline[2] == "integrated": + self.first_appearance = datetime.strptime(sline[1], "%d-%m-%Y,") + elif sline[2] == "sequence": + self.last_seq_change = datetime.strptime(sline[1], "%d-%m-%Y,") + self.seq_version = int(sline[4][:-1]) + elif sline[2] == "entry": + self.last_change = datetime.strptime(sline[1], "%d-%m-%Y,") + self.entry_version = int(sline[4][:-1]) + + def _parse_de_line(self, line): + """Parse a UniProtKB TXT format's DE line(s).""" + sline = line.split() + if sline[1] == "RecName:": + if self.unp_details_full is None: + if sline[2].startswith("Full="): + self.unp_details_full = sline[2][len("Full=") :] + for i in sline[3:]: + if i.startswith("{"): + break + self.unp_details_full += f" {i}" + if self.unp_details_full.endswith(";"): + self.unp_details_full = self.unp_details_full[:-1] + break + + def _parse_os_line(self, line): + """Parse a UniProtKB TXT format's OS line(s).""" + osl = len("OS ") + if line[-1] == ".": + self.organism_species += line[osl:-1] + else: + self.organism_species += line[osl:-1] + " " + + def _parse_ox_line(self, line): + """Parse a UniProtKB TXT format's OX line.""" + # Taxonomy codes only come from NCBI atm. + sline = line.split("=") + self.ncbi_taxid = sline[-1][:-1] + self.ncbi_taxid = self.ncbi_taxid.split()[0] + + def _parse_sq_line(self, line): + """Parse a UniProtKB TXT format's SQ line.""" + sline = line.split() + self.unp_crc64 = sline[6] + + def _parse_sequence(self, line): + """Parse the sequence out of the UniProtKB TXT format.""" + sline = line.split() + self.unp_seq += "".join(sline) + + def _fetch(self): + """Retrieve information for a single UniProtKB entry.""" + if self.entry_version is None: + query_url = f"https://rest.uniprot.org/uniprotkb/{self.unp_ac}.txt" + else: + query_url = ( + f"https://rest.uniprot.org/unisave/{self.unp_ac}?format=txt&" + + f"versions={self.entry_version}" + ) + + rspns = requests.get(query_url, timeout=180) + if not rspns.ok: + raise RuntimeError( + f"UniProtKB entry with AC '{self.unp_ac}' not retrieved for " + + f"URL '{query_url}'" + ) + for line in rspns.iter_lines(decode_unicode=True): + # Check here to learn about UniProtKB's flat file format: + # https://web.expasy.org/docs/userman.html + # ID EntryName Status; SequenceLength. + if line.startswith("ID "): + self._parse_id_line(line) + # DT DD-MMM-YYYY, integrated into UniProtKB/database_name. + # DT DD-MMM-YYYY, sequence version x. + # DT DD-MMM-YYYY, entry version x. + elif line.startswith("DT "): + self._parse_dt_line(line) + # DE RecName: Full=Highly reducing ... tstA {ECO:...|PubMed:...}; + # DE Short=HR-PKS phiA {ECO:0000303|PubMed:26558485}; + # DE EC=2.3.1.- {ECO:0000269|PubMed:26558485}; + # DE AltName: Full=Phomoidride ... protein A {ECO:...|PubMed:...}; + # OS Avian leukosis RSA (RSV-SRA) (Rous sarcoma virus (strain + # OS Schmidt-Ruppin A)). + elif line.startswith("DE "): + self._parse_de_line(line) + elif line.startswith("OS "): + self._parse_os_line(line) + # OX Taxonomy_database_Qualifier=Taxonomic code; + elif line.startswith("OX NCBI_TaxID="): + self._parse_ox_line(line) + # SQ SEQUENCE 3392 AA; 378905 MW; BBD894175578E164 CRC64; + elif line.startswith("SQ "): + self._parse_sq_line(line) + # Seqeunce is stored special, w/o prefix and multiline + elif line.startswith(" "): + self._parse_sequence(line) + # print(line) + + def to_json(self): + """Return entry as JSON object.""" + return { + "ac": self.unp_ac, + "entry_version": self.entry_version, + "organism_species": self.organism_species, + "entry_status": self.entry_status, + "first_appearance": ( + datetime.isoformat(self.first_appearance) + if self.first_appearance is not None + else None + ), + "last_change": ( + datetime.isoformat(self.last_change) + if self.last_change is not None + else None + ), + "last_seq_change": ( + datetime.isoformat(self.last_seq_change) + if self.last_seq_change is not None + else None + ), + "ncbi_taxid": self.ncbi_taxid, + "seq_version": self.seq_version, + "seqlen": self.seqlen, + "details_full": self.unp_details_full, + "id": self.unp_id, + "crc64": self.unp_crc64, + "seq": self.unp_seq, + } + + +class UniProtKBEntryCache: + """Cached retrieval of UniProtKB entries. + + To avoid calling the UniProtKB API for the same UniProtKB AC multiple times, + use this cache. Also helps with keeping code cleaner since you do not need + to carry UniProtKBEntry instances around, just call the cached entry. + + Be careful about UniProtKB entry versions. In theory, when specifying no + version for the cache, the real entry may change at UniProtKB while + running... but this is also an issue when loading the entry live from + UniProtKB multiple times. + + Also be aware that the cache has no size restrictions, it does not get swept + over its whole lifetime.""" + + # The cache serves as a data-dump, storing and handing out instances. No + # need for a lot of public methods, so we ignore PEP, here. + # pylint: disable=too-few-public-methods + def __init__(self, json_cache_file=None): + """Set up the cache.""" + self._cache = {} + self._cache_file = json_cache_file + + # try to fill the cache from file + if ( + self._cache_file is not None + and os.path.exists(self._cache_file) + and os.stat(self._cache_file).st_size != 0 + ): + with open(self._cache_file, encoding="utf8") as jfh: + self._cache = self._from_json(json.load(jfh)) + + def get(self, unp_ac, entry_version=None): + """Get an UniProtKBEntry from the cache. + + If the entry can not be found, it will be fetched from the UniProtKB + API.""" + try: + return self._cache[unp_ac][entry_version] + except KeyError: + unp = UniProtKBEntry(unp_ac, entry_version=entry_version) + if unp_ac not in self._cache: + self._cache[unp_ac] = {} + self._cache[unp_ac][entry_version] = unp + # if we end up here, store the cache on disk + if self._cache_file is not None: + with open(self._cache_file, "w", encoding="utf8") as jfh: + json.dump(self.to_json(), jfh) + + return self._cache[unp_ac][entry_version] + + def to_json(self): + """Turn the cache into a JSON object.""" + data = {} + for acc, versions in self._cache.items(): + data[acc] = {} + for version, entry in versions.items(): + data[acc][version] = entry.to_json() + + return data + + def _from_json(self, data): + """Used to initialise the cache from a JSON object.""" + cache = {} + for acc, versions in data.items(): + cache[acc] = {} + for version, entry in versions.items(): + version = int(version) if version != "null" else None + cache[acc][version] = UniProtKBEntry(None, json_data=entry) + + return cache + + def match_sequence(self, unp_ac, sequence, start, end): + """Match a sequence with the sequence of a UNP entry. + + As the UNP sequence can change over time, walk through the versions + until exact match or return the best match. + + :param unp_ac: UniProtKB Accession. + :type unp_ac: :class:`str` + :param sequence: Target sequence of the model. + :type sequence: :class:`str` + :param start: Start residue of the alignment, 1-based. + :type start: :class:`int` + :param end: End residue of the alignment 1-based. + :type end: :class:`int` + """ + + # This function was first written for a bulk deposition that came with + # alignment range in the UNP sequence, UNP AC but no UNP entry version. + # So this function works with this for now. Later use cases should be + # incorporated as they occur. E.g. having no range (start, end would be + # None) should search for the best alignment and use the range from + # this in the ModelCIF file. + + def aln(unp, trg, start, end): + unp = unp[start:end] + # trg = parasail's query + # unp = parasail's ref + # Since we have a fixed range, we expect a global alignment, use + # parasail's NW variant. + alignment = parasail.nw_trace_scan_sat( + trg, unp, 5, 2, parasail.blosum62 + ) + # get aln boundaries - assuming a NW (global) alignment + db_aln_start = ( + start + len(unp) - len(alignment.traceback.query.lstrip("-")) + ) + db_aln_end = db_aln_start + len( + alignment.traceback.query.rstrip("-") + ) + seq_aln_start = len(trg) - len(alignment.traceback.ref.lstrip("-")) + seq_aln_end = seq_aln_start + len( + alignment.traceback.query.replace("-", "") + ) + # print("TRG", alignment.traceback.query) + # print("UNP", alignment.traceback.ref) + # print(f"TRG {seq_aln_start} {seq_aln_end}") + return ( + db_aln_start + 1, + db_aln_end, + seq_aln_start + 1, + seq_aln_end, + alignment.traceback.ref, + ) + + entry = self.get(unp_ac) + # depositor has provided alignment range, so we only align that region + db_aln_start, db_aln_end, seq_aln_start, seq_aln_end, aln_unp = aln( + entry.unp_seq, sequence, start - 1, end + ) + # print( + # f"{entry.entry_version} - start: {start}/ {db_aln_start+1} end: " + # + f"{end}/ {db_aln_end} len: {len(sequence)} " + # + f"({end-start})" + # ) + # Check that the alignment fits... make sure the aligned UNP sequence + # does not contain gaps, gaps in the UNP sequence would mean deleted + # residues in the target sequence. + if ( + db_aln_start == start + and db_aln_end == end + and aln_unp.find("-") == -1 + ): + return ( + entry, + (db_aln_start, db_aln_end), + (seq_aln_start, seq_aln_end), + ) + version = entry.entry_version + while version > 1: + version -= 1 + entry = self.get(unp_ac, entry_version=version) + db_aln_start, db_aln_end, seq_aln_start, seq_aln_end, aln_unp = ( + aln(entry.unp_seq, sequence, start - 1, end) + ) + # print( + # f"{version} - start: {start}/ {db_aln_start+1} end: {end}/ " + # + f"{db_aln_end} len: {len(sequence)} ({end-start})" + # ) + if ( + db_aln_start == start + and db_aln_end == end + and aln_unp.find("-") == -1 + ): + return ( + entry, + (db_aln_start, db_aln_end), + (seq_aln_start, seq_aln_end), + ) + # print("FOUND", entry.unp_seq.find(sequence)) + # if we end up here, there was no nice alignment + # ToDo: use _abort_msg here, once there is a proper Python module + raise RuntimeError( + f"Could not find a proper alignment in region {start}-{end} for " + + f"{unp_ac}." + ) + + +# LocalWords: de strptime UniProtKBEntry