Skip to content
Snippets Groups Projects
Commit c9043198 authored by Bienchen's avatar Bienchen
Browse files

Added converter script

parent e47db467
No related branches found
No related tags found
No related merge requests found
#! /usr/local/bin/ost
# -*- coding: utf-8 -*-
"""Translate models for Sergey from PDB + extra data into ModelCIF."""
# EXAMPLES for running:
"""
ost translate2modelcif.py ./raw_data ./raw_data/ptm_plddt.all.txt \
./web_dloads/pivot ./modelcif --prefix=F000347 \
--pdb-web-path=./web_dloads/pdb \
--refseq-path=./web_dloads/consensus_all.fasta
"""
# NOTE: add "--compress" for final runs
from timeit import default_timer as timer
import argparse
import filecmp
import gzip
import os
import pandas as pd
import shutil
import sys
import zipfile
import ihm
import ihm.citations
import modelcif
import modelcif.associated
import modelcif.dumper
import modelcif.model
import modelcif.protocol
import modelcif.reference
from ost import io
def _abort_msg(msg, exit_code=1):
"""Write error message and exit with exit_code."""
print(f"{msg}\nAborting.", file=sys.stderr)
sys.exit(exit_code)
def _warn_msg(msg):
"""Write a warning message to stdout."""
print(f"WARNING: {msg}")
def _check_file(file_path):
"""Make sure a file exists and is actually a file."""
if not os.path.exists(file_path):
_abort_msg(f"File not found: '{file_path}'.")
if not os.path.isfile(file_path):
_abort_msg(f"File path does not point to file: '{file_path}'.")
def _check_folder(dir_path):
"""Make sure a file exists and is actually a file."""
if not os.path.exists(dir_path):
_abort_msg(f"Path not found: '{dir_path}'.")
if not os.path.isdir(dir_path):
_abort_msg(f"Path does not point to a directory: '{dir_path}'.")
def _check_opts_folder(dir_path):
"""Remove trailing '/' (return fixed one) and check if path valid."""
if dir_path.endswith("/"):
dir_path = dir_path[:-1]
_check_folder(dir_path)
return dir_path
def _parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__,
)
parser.add_argument(
"model_base_dir",
type=str,
metavar="<MODEL BASE DIR>",
help="Directory with pub_data* directories with model PDBs.",
)
parser.add_argument(
"metadata_file",
type=str,
metavar="<METADATA FILE>",
help="Path to table with metadata.",
)
parser.add_argument(
"msa_data_dir",
type=str,
metavar="<MSA DIR>",
help="Directory with F*.fasta files for custom MSAs.",
)
parser.add_argument(
"out_dir",
type=str,
metavar="<OUTPUT DIR>",
help="Path to directory to store results.",
)
parser.add_argument(
"--prefix",
type=str,
metavar="<PREFIX>",
default="",
help="Only process families starting with given prefix. By default " \
+ "all families are processed.",
)
parser.add_argument(
"--compress",
default=False,
action="store_true",
help="Compress ModelCIF file with gzip "
"(note that acc. data is zipped either way).",
)
parser.add_argument(
"--pdb-web-path",
type=str,
metavar="<PDB WEB PATH>",
default=None,
help="Optional path to directory with F*.pdb files as available on " \
+ "NMPFamsDB web site. Used to check if top ranked model the same.",
)
parser.add_argument(
"--refseq-path",
type=str,
metavar="<PDB REFSEQ PATH>",
default=None,
help="Optional path to fasta file with all ref. seq. as available on " \
+ "NMPFamsDB web site. Used to check if it matches with MSA.",
)
opts = parser.parse_args()
# check input
opts.model_base_dir = _check_opts_folder(opts.model_base_dir)
_check_file(opts.metadata_file)
opts.msa_data_dir = _check_opts_folder(opts.msa_data_dir)
# check out_dir
if opts.out_dir.endswith("/"):
opts.out_dir = opts.out_dir[:-1]
if not os.path.exists(opts.out_dir):
os.makedirs(opts.out_dir)
if not os.path.isdir(opts.out_dir):
_abort_msg(f"Path '{opts.out_dir}' does not point to a directory.")
# check optional paths
if opts.pdb_web_path is not None:
opts.pdb_web_path = _check_opts_folder(opts.pdb_web_path)
if opts.refseq_path is not None:
_check_file(opts.refseq_path)
return opts
# pylint: disable=too-few-public-methods
class _GlobalPTM(modelcif.qa_metric.Global, modelcif.qa_metric.PTM):
"""Predicted accuracy according to the TM-score score in [0,1]"""
name = "pTM"
software = None
class _GlobalPLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT):
"""Predicted accuracy according to the CA-only lDDT in [0,100]"""
name = "pLDDT"
software = None
class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT):
"""Predicted accuracy according to the CA-only lDDT in [0,100]"""
name = "pLDDT"
software = None
class _NmpfamsdbTrgRef(modelcif.reference.TargetReference):
"""PDB as target reference."""
name = "Other"
other_details = "NMPFamsDB"
# pylint: enable=too-few-public-methods
def _get_res_num(r, use_auth=False):
"""Get res. num. from auth. IDs if reading from mmCIF files."""
if use_auth:
return int(r.GetStringProp("pdb_auth_resnum"))
else:
return r.number.num
def _get_ch_name(ch, use_auth=False):
"""Get chain name from auth. IDs if reading from mmCIF files."""
if use_auth:
return ch.GetStringProp("pdb_auth_chain_name")
else:
return ch.name
class _OST2ModelCIF(modelcif.model.AbInitioModel):
"""Map OST entity elements to ihm.model"""
def __init__(self, *args, **kwargs):
"""Initialise a model"""
for i in ["ost_entity", "asym", "scores_json"]:
if i not in kwargs:
raise TypeError(f"Required keyword argument '{i}' not found.")
self.ost_entity = kwargs.pop("ost_entity")
self.asym = kwargs.pop("asym")
self.scores_json = kwargs.pop("scores_json")
# use auth IDs for res. nums and chain names
self.use_auth = False
# fetch plddts per residue
self.plddts = []
for res in self.ost_entity.residues:
b_factors = [a.b_factor for a in res.atoms]
assert len(set(b_factors)) == 1 # must all be equal!
self.plddts.append(b_factors[0])
super().__init__(*args, **kwargs)
def get_atoms(self):
# ToDo [internal]: Take B-factor out since its not a B-factor?
# NOTE: this assumes that _get_res_num maps residue to pos. in seqres
# within asym
for atm in self.ost_entity.atoms:
yield modelcif.model.Atom(
asym_unit=self.asym[_get_ch_name(atm.chain, self.use_auth)],
seq_id=_get_res_num(atm.residue, self.use_auth),
atom_id=atm.name,
type_symbol=atm.element,
x=atm.pos[0],
y=atm.pos[1],
z=atm.pos[2],
het=atm.is_hetatom,
biso=atm.b_factor,
occupancy=atm.occupancy,
)
def add_scores(self):
"""Add QA metrics from AF2 scores."""
# global scores
self.qa_metrics.extend(
(
_GlobalPLDDT(self.scores_json["plddt_global"]),
_GlobalPTM(self.scores_json["ptm"]),
)
)
# local scores
i = 0
for chn_i in self.ost_entity.chains:
ch_name = _get_ch_name(chn_i, self.use_auth)
for res_i in chn_i.residues:
# local pLDDT
res_num = _get_res_num(res_i, self.use_auth)
self.qa_metrics.append(
_LocalPLDDT(
self.asym[ch_name].residue(res_num),
self.plddts[i],
)
)
i += 1
def _get_audit_authors():
"""Return the list of authors that produced this model."""
return (
"Pavlopoulos, Georgios A.",
"Baltoumas, Fotis A.",
"Liu, Sirui",
"Selvitopi, Oguz",
"Camargo, Antonio Pedro",
"Nayfach, Stephen",
"Azad, Ariful",
"Roux, Simon",
"Call, Lee",
"Ivanova, Natalia N.",
"Chen, I-Min",
"Paez-Espino, David",
"Karatzas, Evangelos",
"Novel Metagenome Protein Families Consortium",
"Iliopoulos, Ioannis",
"Konstantinidi, Konstantinos",
"Tiedje, James M.",
"Pett-Ridge, Jennifer",
"Baker, David",
"Visel, Axel",
"Ouzounis, Christos A.",
"Ovchinnikov, Sergey",
"Buluc, Aydin",
"Kyrpides, Nikos C.",
)
def _get_metadata(metadata_file):
"""Read csv file with metedata and prepare for next steps."""
metadata = pd.read_csv(metadata_file, sep=' ',
names=["ID", "mdl", "pTM", "pLDDT"])
return metadata
def _get_pdb_files(model_base_dir):
"""Collect PDB files from pub_data_* folders.
Returns dict with key = family name and value = list of paths to PDB files.
"""
pdb_files_split = dict() # to return
pdb_files_raw = set() # to check for duplicates
pub_paths = [f for f in os.listdir(model_base_dir) \
if f.startswith("pub_data_")]
# NOTE: we sort pub_paths to ensure that pub_data_02 is before _03
for pub_path in sorted(pub_paths):
sub_path = os.path.join(model_base_dir, pub_path)
pdb_files_new = [f for f in os.listdir(sub_path) \
if not f.startswith(".")]
for f in pdb_files_new:
f_path = os.path.join(sub_path, f)
f_name = f.split('_')[0]
if f_name in pdb_files_split:
pdb_files_split[f_name].append(f_path)
else:
pdb_files_split[f_name] = [f_path]
# check global list
pdb_files_new_set = set(pdb_files_new)
new_duplicates = pdb_files_raw.intersection(pdb_files_new_set)
if new_duplicates:
_warn_msg(f"{len(new_duplicates)} duplicated files found in " \
f"{sub_path}.")
pdb_files_raw = pdb_files_raw.union(pdb_files_new_set)
return pdb_files_split
def _get_config():
"""Define AF setup."""
msa_description = "MSA created by calculating the central or \"pivot\" " \
"sequence of each seed MSA, and refining each " \
"alignment using that sequence as the guide."
mdl_description = "Model generated using AlphaFold (v2.0.0 with models " \
"finetuned to return ptm weights) producing 5 models, " \
"without model relaxation, without templates, ranked " \
"by pLDDT, starting from a custom MSA."
af_config = {}
return {
"af_config": af_config,
"af_version": "2.0.0",
"mdl_description": mdl_description,
"msa_description": msa_description,
"use_templates": False,
"use_small_bfd": False,
"use_multimer": False,
}
def _get_protocol_steps_and_software(config_data):
"""Create the list of protocol steps with software and parameters used."""
protocol = []
# MSA step
step = {
"method_type": "coevolution MSA",
"name": None,
"details": config_data["msa_description"],
}
step["input"] = "target_sequences"
step["output"] = "MSA"
step["software"] = []
step["software_parameters"] = {}
protocol.append(step)
# modelling step
step = {
"method_type": "modeling",
"name": None,
"details": config_data["mdl_description"],
}
# get input data
# Must refer to data already in the JSON, so we try keywords
step["input"] = "target_sequences_and_MSA"
# get output data
# Must refer to existing data, so we try keywords
step["output"] = "model"
# get software
if config_data["use_multimer"]:
step["software"] = [{
"name": "AlphaFold-Multimer",
"classification": "model building",
"description": "Structure prediction",
"citation": ihm.Citation(
pmid=None,
title="Protein complex prediction with "
+ "AlphaFold-Multimer.",
journal="bioRxiv",
volume=None,
page_range=None,
year=2021,
authors=[
"Evans, R.",
"O'Neill, M.",
"Pritzel, A.",
"Antropova, N.",
"Senior, A.",
"Green, T.",
"Zidek, A.",
"Bates, R.",
"Blackwell, S.",
"Yim, J.",
"Ronneberger, O.",
"Bodenstein, S.",
"Zielinski, M.",
"Bridgland, A.",
"Potapenko, A.",
"Cowie, A.",
"Tunyasuvunakool, K.",
"Jain, R.",
"Clancy, E.",
"Kohli, P.",
"Jumper, J.",
"Hassabis, D.",
],
doi="10.1101/2021.10.04.463034",
),
"location": "https://github.com/deepmind/alphafold",
"type": "package",
"version": config_data["af_version"],
}]
else:
step["software"] = [{
"name": "AlphaFold",
"classification": "model building",
"description": "Structure prediction",
"citation": ihm.citations.alphafold2,
"location": "https://github.com/deepmind/alphafold",
"type": "package",
"version": config_data["af_version"],
}]
step["software_parameters"] = config_data["af_config"]
protocol.append(step)
return protocol
def _get_title(fam_name):
"""Get a title for this modelling experiment."""
return f"AlphaFold model for NMPFamsDB Family {fam_name}"
def _get_model_details(fam_name):
"""Get the model description."""
db_url = f"https://bib.fleming.gr/NMPFamsDB/family?id={fam_name}"
# TODO: check if ok to use HTML for the URL
db_url = f"<a href=\"{db_url}\" target=\"_blank\">{db_url}</a>"
return f"Model generated using AlphaFold (v2.0.0) for the " \
f"\"Representative Sequence\" of NMPFamsDB Metagenome / " \
f"Metatranscriptome Family {fam_name}.\n\nSee {db_url} for " \
f"additional details."
def _get_model_group_name():
"""Get a name for a model group."""
return None
def _get_sequence(chn, use_auth=False):
"""Get the sequence out of an OST chain incl. '-' for gaps in resnums."""
# initialise (add gaps if first is not at num. 1)
lst_rn = _get_res_num(chn.residues[0], use_auth)
idx = 1
sqe = "-" * (lst_rn - 1) \
+ chn.residues[0].one_letter_code
for res in chn.residues[idx:]:
lst_rn += 1
while lst_rn != _get_res_num(res, use_auth):
sqe += "-"
lst_rn += 1
sqe += res.one_letter_code
return sqe
def _get_entities(pdb_file, ref_seq, fam_name):
"""Gather data for the mmCIF (target) entities."""
_check_file(pdb_file)
ost_ent = io.LoadPDB(pdb_file)
if ost_ent.chain_count != 1:
raise RuntimeError(
f"Unexpected oligomer in {pdb_file}"
)
chn = ost_ent.chains[0]
sqe_gaps = _get_sequence(chn)
# NOTE: can have gaps to accommodate "X" in ref_seq
exp_seq = sqe_gaps.replace('-', 'X')
len_diff = len(ref_seq.string) - len(exp_seq)
if len_diff > 0:
exp_seq += 'X' * len_diff
if exp_seq != ref_seq.string:
raise RuntimeError(f"Sequence in {pdb_file} does not match ref_seq")
# TODO: waiting for input on whether they change handling of "X" in seq
# -> HERE: assuming that "X" were in modelled sequence and PDB can have gaps
cif_ent = {
"seqres": ref_seq.string, # HACK for testing: .replace('X', 'A')
"pdb_sequence": sqe_gaps,
"pdb_chain_id": [_get_ch_name(chn, False)],
"fam_name": fam_name,
"description": f"Representative Sequence of NMPFamsDB Family {fam_name}"
}
return [cif_ent], ost_ent
def _get_modelcif_entities(target_ents, asym_units, system):
"""Create ModelCIF entities and asymmetric units."""
for cif_ent in target_ents:
mdlcif_ent = modelcif.Entity(
# NOTE: sequence here defines residues in model!
cif_ent["seqres"],
description=cif_ent["description"],
source=None,
references=[
_NmpfamsdbTrgRef(
cif_ent["fam_name"],
cif_ent["fam_name"],
align_begin=1,
align_end=len(cif_ent["seqres"])
)
],
)
# NOTE: this assigns (potentially new) alphabetic chain names
for pdb_chain_id in cif_ent["pdb_chain_id"]:
asym_units[pdb_chain_id] = modelcif.AsymUnit(
mdlcif_ent, strand_id=pdb_chain_id,
)
system.target_entities.append(mdlcif_ent)
def _get_assoc_aln_file(fle_path):
"""Generate a modelcif.associated.File object pointing to FASTA formatted
file containing MSA.
"""
cfile = modelcif.associated.File(
fle_path,
details="Custom MSA for modelling",
data=modelcif.data.Data("Custom MSA for modelling")
)
cfile.file_format = "fasta"
cfile.file_content = "multiple sequence alignments"
return cfile
def _get_associated_files(mdl_name, arc_files):
"""Create entry for associated files."""
# package all into zip file
return modelcif.associated.Repository(
"",
[modelcif.associated.ZipFile(f"{mdl_name}.zip", files=arc_files)],
)
# NOTE: by convention MA expects zip file with same name as model-cif
def _assemble_modelcif_software(soft_dict):
"""Create a modelcif.Software instance from dictionary."""
return modelcif.Software(
soft_dict["name"],
soft_dict["classification"],
soft_dict["description"],
soft_dict["location"],
soft_dict["type"],
soft_dict["version"],
citation=soft_dict["citation"],
)
def _get_modelcif_protocol_software(js_step):
"""Assemble software entries for a ModelCIF protocol step."""
if js_step["software"]:
if len(js_step["software"]) == 1:
sftwre = _assemble_modelcif_software(js_step["software"][0])
else:
sftwre = []
for sft in js_step["software"]:
sftwre.append(_assemble_modelcif_software(sft))
sftwre = modelcif.SoftwareGroup(elements=sftwre)
if js_step["software_parameters"]:
params = []
for key, val in js_step["software_parameters"].items():
params.append(modelcif.SoftwareParameter(key, val))
if isinstance(sftwre, modelcif.SoftwareGroup):
sftwre.parameters = params
else:
sftwre = modelcif.SoftwareGroup(
elements=(sftwre,), parameters=params
)
return sftwre
return None
def _get_modelcif_protocol_data(data_label, target_entities, aln_data, model):
"""Assemble data for a ModelCIF protocol step."""
if data_label == "target_sequences":
data = modelcif.data.DataGroup(target_entities)
elif data_label == "MSA":
data = aln_data
elif data_label == "target_sequences_and_MSA":
data = modelcif.data.DataGroup(target_entities)
data.append(aln_data)
elif data_label == "model":
data = model
else:
raise RuntimeError(f"Unknown protocol data: '{data_label}'")
return data
def _get_modelcif_protocol(protocol_steps, target_entities, aln_data, model):
"""Create the protocol for the ModelCIF file."""
protocol = modelcif.protocol.Protocol()
for js_step in protocol_steps:
sftwre = _get_modelcif_protocol_software(js_step)
input_data = _get_modelcif_protocol_data(
js_step["input"], target_entities, aln_data, model
)
output_data = _get_modelcif_protocol_data(
js_step["output"], target_entities, aln_data, model
)
protocol.steps.append(
modelcif.protocol.Step(
input_data=input_data,
output_data=output_data,
name=js_step["name"],
details=js_step["details"],
software=sftwre,
)
)
protocol.steps[-1].method_type = js_step["method_type"]
return protocol
def _compress_cif_file(cif_file):
"""Compress cif file and delete original."""
with open(cif_file, 'rb') as f_in:
with gzip.open(cif_file + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(cif_file)
def _package_associated_files(repo):
"""Compress associated files into single zip file and delete original."""
# zip settings tested for good speed vs compression
for archive in repo.files:
with zipfile.ZipFile(archive.path, "w", zipfile.ZIP_BZIP2) as cif_zip:
for zfile in archive.files:
cif_zip.write(zfile.path, arcname=zfile.path)
os.remove(zfile.path)
def _store_as_modelcif(data_json, ost_ent, out_dir, mdl_name, compress):
"""Mix all the data into a ModelCIF file."""
print(" generating ModelCIF objects...", end="")
pstart = timer()
# create system to gather all the data
system = modelcif.System(
title=data_json["title"],
id=data_json["mdl_id"].upper(),
model_details=data_json["model_details"],
)
# create an asymmetric unit and an entity per target sequence
asym_units = {}
_get_modelcif_entities(
data_json["target_entities"], asym_units, system
)
# audit_authors
system.authors.extend(data_json["audit_authors"])
# set up the model to produce coordinates
model = _OST2ModelCIF(
assembly=modelcif.Assembly(asym_units.values()),
asym=asym_units,
ost_entity=ost_ent,
scores_json=data_json,
name=data_json["mdl_name"],
)
print(f" ({timer()-pstart:.2f}s)")
print(" processing QA scores...", end="", flush=True)
pstart = timer()
model.add_scores()
print(f" ({timer()-pstart:.2f}s)")
model_group = modelcif.model.ModelGroup(
[model], name=data_json["model_group_name"]
)
system.model_groups.append(model_group)
# handle additional files
aln_file = _get_assoc_aln_file(data_json["aln_file_name"])
system.repositories.append(
_get_associated_files(mdl_name, [aln_file])
)
# get data and steps
protocol = _get_modelcif_protocol(
data_json["protocol"], system.target_entities, aln_file.data, model,
)
system.protocols.append(protocol)
# write modelcif System to file (NOTE: no PAE here!)
print(" write to disk...", end="", flush=True)
pstart = timer()
# copy aln file to compress them
shutil.copyfile(
data_json["aln_file_path"],
os.path.join(out_dir, data_json["aln_file_name"])
)
# NOTE: we change path and back while being exception-safe to handle zipfile
oldpwd = os.getcwd()
os.chdir(out_dir)
mdl_fle = f"{mdl_name}.cif"
try:
with open(mdl_fle, "w", encoding="ascii") as mmcif_fh:
modelcif.dumper.write(mmcif_fh, [system])
_package_associated_files(system.repositories[0])
if compress:
_compress_cif_file(mdl_fle)
mdl_fle += ".gz"
finally:
os.chdir(oldpwd)
print(f" ({timer()-pstart:.2f}s)")
def _translate2modelcif(f_name, opts, metadata_fam, pdb_files, ref_seq_check):
"""Convert a model with its accompanying data to ModelCIF."""
# TODO: unclear what to do with such cases; skipped for now
if len(metadata_fam) != 5:
_warn_msg(f"Unexpected number of {len(metadata_fam)} models in " \
f"metadata for family {f_name}. Skipping...")
return
#
mdl_id = f_name
# skip if done already
if opts.compress:
cifext = "cif.gz"
else:
cifext = "cif"
mdl_path = os.path.join(opts.out_dir, f"{mdl_id}.{cifext}")
if os.path.exists(mdl_path):
print(f" {mdl_id} already done...")
return
# go for it...
print(f" translating {mdl_id}...")
pdb_start = timer()
# get aln_data and ref. seq. for this entry
aln_file = f"{f_name}.fasta"
aln_path = os.path.join(opts.msa_data_dir, aln_file)
# TODO: if we need to handle files without ALN, this needs fixing
# -> e.g. 11 extra models in pub_data_* cannot be handled right now
if not os.path.exists(aln_path):
_warn_msg(f"Cannot deal with missing MSA for {f_name} (yet). " \
f"Skipping...")
return
#
aln = io.LoadAlignment(aln_path) # note: this checks that it's an actual MSA
ref_seq = aln.sequences[0]
if ref_seq_check is not None and ref_seq_check.string != ref_seq.string:
raise RuntimeError(f"Sequence mismatch for {f_name}")
# TODO: allow "X" (or whatever can be used to label unknown AA) if needed
if "X" in ref_seq.string:
_warn_msg(f"Cannot deal with 'X' in ref_seq for {f_name} (yet). " \
f"Skipping...")
return
#
# gather data into JSON-like structure
print(" preparing data...", end="")
pstart = timer()
config_data = _get_config()
mdlcf_json = {}
mdlcf_json["audit_authors"] = _get_audit_authors()
mdlcf_json["protocol"] = _get_protocol_steps_and_software(config_data)
mdlcf_json["config_data"] = config_data
mdlcf_json["mdl_id"] = mdl_id
mdlcf_json["aln_file_name"] = aln_file
mdlcf_json["aln_file_path"] = aln_path
# find model to process
# TODO: here just top pLDDT model processed; extend for more if needed...
top_metadata = metadata_fam.loc[metadata_fam.pLDDT.idxmax()]
pdb_list_sel = [f for f in pdb_files if top_metadata.mdl in f]
if len(pdb_list_sel) != 1:
# this should only happen if duplicated file in pub_data_*
# TODO: for now no warning shown and we just pick first hit
# -> first hit is from lowest "pub_data_*"
# -> unclear if we should worry about it...
pass
mdlcf_json["mdl_name"] = f"Top ranked model ({top_metadata.mdl})"
# get scores for this entry
mdlcf_json["plddt_global"] = top_metadata.pLDDT
mdlcf_json["ptm"] = top_metadata.pTM
# process coordinates
pdb_file = pdb_list_sel[0]
target_entities, ost_ent = _get_entities(pdb_file, ref_seq, f_name)
mdlcf_json["target_entities"] = target_entities
# sanity check (only for top ranked model!)
if opts.pdb_web_path is not None:
pdb_file_web = os.path.join(opts.pdb_web_path, f"{f_name}.pdb")
if not filecmp.cmp(pdb_file, pdb_file_web):
raise RuntimeError(f"PDB file mismatch web vs top-ranked for " \
f"{f_name}")
# fill annotations
mdlcf_json["title"] = _get_title(f_name)
mdlcf_json["model_details"] = _get_model_details(f_name)
mdlcf_json["model_group_name"] = _get_model_group_name()
print(f" ({timer()-pstart:.2f}s)")
# save ModelCIF
_store_as_modelcif(mdlcf_json, ost_ent, opts.out_dir, mdl_id, opts.compress)
# check if result can be read and has expected seq.
ent, ss = io.LoadMMCIF(mdl_path, seqres=True)
exp_seqs = [trg_ent["pdb_sequence"] \
for trg_ent in mdlcf_json["target_entities"]]
assert ent.chain_count == len(exp_seqs), f"Bad chain count {mdl_id}"
# here we expect auth = label IDs
ent_seq = "".join([_get_sequence(chn, False) for chn in ent.chains])
ent_seq_a = "".join([_get_sequence(chn, True) for chn in ent.chains])
assert ent_seq == ent_seq_a
assert ent_seq == "".join(exp_seqs), f"Bad seq. {mdl_id}"
ent_seqres = "".join([ss.FindSequence(chn.name).string \
for chn in ent.chains])
exp_seqres = "".join([trg_ent["seqres"] \
for trg_ent in mdlcf_json["target_entities"]])
assert ent_seqres == exp_seqres, f"Bad seqres {mdl_id}"
print(f" ... done with {mdl_id} ({timer()-pdb_start:.2f}s).")
def _main():
"""Run as script."""
opts = _parse_args()
# parse/fetch global data
metadata_full = _get_metadata(opts.metadata_file)
pdb_files_split = _get_pdb_files(opts.model_base_dir)
if opts.refseq_path is not None:
refseqs = io.LoadSequenceList(opts.refseq_path)
else:
refseqs = None
# get on with models
print(f"Working on {opts.prefix}*...")
# iterate over models
for f_name in sorted(pdb_files_split):
if f_name.startswith(opts.prefix):
_translate2modelcif(
f_name, opts,
metadata_full[metadata_full.ID == f_name],
pdb_files_split[f_name],
refseqs.FindSequence(f_name) if refseqs is not None else None,
)
print(f"... done with {opts.prefix}*.")
if __name__ == "__main__":
_main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment