diff --git a/projects/dark-matter-metagenomics/translate2modelcif.py b/projects/dark-matter-metagenomics/translate2modelcif.py new file mode 100644 index 0000000000000000000000000000000000000000..cd3a91dc656e5acbc65b4462b259ccada4a2a426 --- /dev/null +++ b/projects/dark-matter-metagenomics/translate2modelcif.py @@ -0,0 +1,877 @@ +#! /usr/local/bin/ost +# -*- coding: utf-8 -*- + +"""Translate models for Sergey from PDB + extra data into ModelCIF.""" + +# EXAMPLES for running: +""" +ost translate2modelcif.py ./raw_data ./raw_data/ptm_plddt.all.txt \ + ./web_dloads/pivot ./modelcif --prefix=F000347 \ + --pdb-web-path=./web_dloads/pdb \ + --refseq-path=./web_dloads/consensus_all.fasta +""" +# NOTE: add "--compress" for final runs + +from timeit import default_timer as timer +import argparse +import filecmp +import gzip +import os +import pandas as pd +import shutil +import sys +import zipfile + +import ihm +import ihm.citations +import modelcif +import modelcif.associated +import modelcif.dumper +import modelcif.model +import modelcif.protocol +import modelcif.reference + +from ost import io + + +def _abort_msg(msg, exit_code=1): + """Write error message and exit with exit_code.""" + print(f"{msg}\nAborting.", file=sys.stderr) + sys.exit(exit_code) + + +def _warn_msg(msg): + """Write a warning message to stdout.""" + print(f"WARNING: {msg}") + + +def _check_file(file_path): + """Make sure a file exists and is actually a file.""" + if not os.path.exists(file_path): + _abort_msg(f"File not found: '{file_path}'.") + if not os.path.isfile(file_path): + _abort_msg(f"File path does not point to file: '{file_path}'.") + + +def _check_folder(dir_path): + """Make sure a file exists and is actually a file.""" + if not os.path.exists(dir_path): + _abort_msg(f"Path not found: '{dir_path}'.") + if not os.path.isdir(dir_path): + _abort_msg(f"Path does not point to a directory: '{dir_path}'.") + + +def _check_opts_folder(dir_path): + """Remove trailing '/' (return fixed one) and check if path valid.""" + if dir_path.endswith("/"): + dir_path = dir_path[:-1] + _check_folder(dir_path) + return dir_path + + +def _parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, + ) + + parser.add_argument( + "model_base_dir", + type=str, + metavar="<MODEL BASE DIR>", + help="Directory with pub_data* directories with model PDBs.", + ) + parser.add_argument( + "metadata_file", + type=str, + metavar="<METADATA FILE>", + help="Path to table with metadata.", + ) + parser.add_argument( + "msa_data_dir", + type=str, + metavar="<MSA DIR>", + help="Directory with F*.fasta files for custom MSAs.", + ) + parser.add_argument( + "out_dir", + type=str, + metavar="<OUTPUT DIR>", + help="Path to directory to store results.", + ) + parser.add_argument( + "--prefix", + type=str, + metavar="<PREFIX>", + default="", + help="Only process families starting with given prefix. By default " \ + + "all families are processed.", + ) + parser.add_argument( + "--compress", + default=False, + action="store_true", + help="Compress ModelCIF file with gzip " + "(note that acc. data is zipped either way).", + ) + parser.add_argument( + "--pdb-web-path", + type=str, + metavar="<PDB WEB PATH>", + default=None, + help="Optional path to directory with F*.pdb files as available on " \ + + "NMPFamsDB web site. Used to check if top ranked model the same.", + ) + parser.add_argument( + "--refseq-path", + type=str, + metavar="<PDB REFSEQ PATH>", + default=None, + help="Optional path to fasta file with all ref. seq. as available on " \ + + "NMPFamsDB web site. Used to check if it matches with MSA.", + ) + opts = parser.parse_args() + + # check input + opts.model_base_dir = _check_opts_folder(opts.model_base_dir) + _check_file(opts.metadata_file) + opts.msa_data_dir = _check_opts_folder(opts.msa_data_dir) + # check out_dir + if opts.out_dir.endswith("/"): + opts.out_dir = opts.out_dir[:-1] + if not os.path.exists(opts.out_dir): + os.makedirs(opts.out_dir) + if not os.path.isdir(opts.out_dir): + _abort_msg(f"Path '{opts.out_dir}' does not point to a directory.") + # check optional paths + if opts.pdb_web_path is not None: + opts.pdb_web_path = _check_opts_folder(opts.pdb_web_path) + if opts.refseq_path is not None: + _check_file(opts.refseq_path) + return opts + + +# pylint: disable=too-few-public-methods +class _GlobalPTM(modelcif.qa_metric.Global, modelcif.qa_metric.PTM): + """Predicted accuracy according to the TM-score score in [0,1]""" + name = "pTM" + software = None + + +class _GlobalPLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT): + """Predicted accuracy according to the CA-only lDDT in [0,100]""" + name = "pLDDT" + software = None + + +class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT): + """Predicted accuracy according to the CA-only lDDT in [0,100]""" + name = "pLDDT" + software = None + + +class _NmpfamsdbTrgRef(modelcif.reference.TargetReference): + """PDB as target reference.""" + name = "Other" + other_details = "NMPFamsDB" +# pylint: enable=too-few-public-methods + + +def _get_res_num(r, use_auth=False): + """Get res. num. from auth. IDs if reading from mmCIF files.""" + if use_auth: + return int(r.GetStringProp("pdb_auth_resnum")) + else: + return r.number.num + + +def _get_ch_name(ch, use_auth=False): + """Get chain name from auth. IDs if reading from mmCIF files.""" + if use_auth: + return ch.GetStringProp("pdb_auth_chain_name") + else: + return ch.name + + +class _OST2ModelCIF(modelcif.model.AbInitioModel): + """Map OST entity elements to ihm.model""" + + def __init__(self, *args, **kwargs): + """Initialise a model""" + for i in ["ost_entity", "asym", "scores_json"]: + if i not in kwargs: + raise TypeError(f"Required keyword argument '{i}' not found.") + self.ost_entity = kwargs.pop("ost_entity") + self.asym = kwargs.pop("asym") + self.scores_json = kwargs.pop("scores_json") + + # use auth IDs for res. nums and chain names + self.use_auth = False + + # fetch plddts per residue + self.plddts = [] + for res in self.ost_entity.residues: + b_factors = [a.b_factor for a in res.atoms] + assert len(set(b_factors)) == 1 # must all be equal! + self.plddts.append(b_factors[0]) + + super().__init__(*args, **kwargs) + + + def get_atoms(self): + # ToDo [internal]: Take B-factor out since its not a B-factor? + # NOTE: this assumes that _get_res_num maps residue to pos. in seqres + # within asym + for atm in self.ost_entity.atoms: + yield modelcif.model.Atom( + asym_unit=self.asym[_get_ch_name(atm.chain, self.use_auth)], + seq_id=_get_res_num(atm.residue, self.use_auth), + atom_id=atm.name, + type_symbol=atm.element, + x=atm.pos[0], + y=atm.pos[1], + z=atm.pos[2], + het=atm.is_hetatom, + biso=atm.b_factor, + occupancy=atm.occupancy, + ) + + def add_scores(self): + """Add QA metrics from AF2 scores.""" + # global scores + self.qa_metrics.extend( + ( + _GlobalPLDDT(self.scores_json["plddt_global"]), + _GlobalPTM(self.scores_json["ptm"]), + ) + ) + + + # local scores + i = 0 + for chn_i in self.ost_entity.chains: + ch_name = _get_ch_name(chn_i, self.use_auth) + for res_i in chn_i.residues: + # local pLDDT + res_num = _get_res_num(res_i, self.use_auth) + self.qa_metrics.append( + _LocalPLDDT( + self.asym[ch_name].residue(res_num), + self.plddts[i], + ) + ) + i += 1 + + +def _get_audit_authors(): + """Return the list of authors that produced this model.""" + return ( + "Pavlopoulos, Georgios A.", + "Baltoumas, Fotis A.", + "Liu, Sirui", + "Selvitopi, Oguz", + "Camargo, Antonio Pedro", + "Nayfach, Stephen", + "Azad, Ariful", + "Roux, Simon", + "Call, Lee", + "Ivanova, Natalia N.", + "Chen, I-Min", + "Paez-Espino, David", + "Karatzas, Evangelos", + "Novel Metagenome Protein Families Consortium", + "Iliopoulos, Ioannis", + "Konstantinidi, Konstantinos", + "Tiedje, James M.", + "Pett-Ridge, Jennifer", + "Baker, David", + "Visel, Axel", + "Ouzounis, Christos A.", + "Ovchinnikov, Sergey", + "Buluc, Aydin", + "Kyrpides, Nikos C.", + ) + + +def _get_metadata(metadata_file): + """Read csv file with metedata and prepare for next steps.""" + metadata = pd.read_csv(metadata_file, sep=' ', + names=["ID", "mdl", "pTM", "pLDDT"]) + return metadata + + +def _get_pdb_files(model_base_dir): + """Collect PDB files from pub_data_* folders. + Returns dict with key = family name and value = list of paths to PDB files. + """ + pdb_files_split = dict() # to return + pdb_files_raw = set() # to check for duplicates + pub_paths = [f for f in os.listdir(model_base_dir) \ + if f.startswith("pub_data_")] + # NOTE: we sort pub_paths to ensure that pub_data_02 is before _03 + for pub_path in sorted(pub_paths): + sub_path = os.path.join(model_base_dir, pub_path) + pdb_files_new = [f for f in os.listdir(sub_path) \ + if not f.startswith(".")] + for f in pdb_files_new: + f_path = os.path.join(sub_path, f) + f_name = f.split('_')[0] + if f_name in pdb_files_split: + pdb_files_split[f_name].append(f_path) + else: + pdb_files_split[f_name] = [f_path] + # check global list + pdb_files_new_set = set(pdb_files_new) + new_duplicates = pdb_files_raw.intersection(pdb_files_new_set) + if new_duplicates: + _warn_msg(f"{len(new_duplicates)} duplicated files found in " \ + f"{sub_path}.") + pdb_files_raw = pdb_files_raw.union(pdb_files_new_set) + return pdb_files_split + + +def _get_config(): + """Define AF setup.""" + msa_description = "MSA created by calculating the central or \"pivot\" " \ + "sequence of each seed MSA, and refining each " \ + "alignment using that sequence as the guide." + mdl_description = "Model generated using AlphaFold (v2.0.0 with models " \ + "finetuned to return ptm weights) producing 5 models, " \ + "without model relaxation, without templates, ranked " \ + "by pLDDT, starting from a custom MSA." + af_config = {} + return { + "af_config": af_config, + "af_version": "2.0.0", + "mdl_description": mdl_description, + "msa_description": msa_description, + "use_templates": False, + "use_small_bfd": False, + "use_multimer": False, + } + + +def _get_protocol_steps_and_software(config_data): + """Create the list of protocol steps with software and parameters used.""" + protocol = [] + + # MSA step + step = { + "method_type": "coevolution MSA", + "name": None, + "details": config_data["msa_description"], + } + step["input"] = "target_sequences" + step["output"] = "MSA" + step["software"] = [] + step["software_parameters"] = {} + protocol.append(step) + + # modelling step + step = { + "method_type": "modeling", + "name": None, + "details": config_data["mdl_description"], + } + # get input data + # Must refer to data already in the JSON, so we try keywords + step["input"] = "target_sequences_and_MSA" + # get output data + # Must refer to existing data, so we try keywords + step["output"] = "model" + # get software + if config_data["use_multimer"]: + step["software"] = [{ + "name": "AlphaFold-Multimer", + "classification": "model building", + "description": "Structure prediction", + "citation": ihm.Citation( + pmid=None, + title="Protein complex prediction with " + + "AlphaFold-Multimer.", + journal="bioRxiv", + volume=None, + page_range=None, + year=2021, + authors=[ + "Evans, R.", + "O'Neill, M.", + "Pritzel, A.", + "Antropova, N.", + "Senior, A.", + "Green, T.", + "Zidek, A.", + "Bates, R.", + "Blackwell, S.", + "Yim, J.", + "Ronneberger, O.", + "Bodenstein, S.", + "Zielinski, M.", + "Bridgland, A.", + "Potapenko, A.", + "Cowie, A.", + "Tunyasuvunakool, K.", + "Jain, R.", + "Clancy, E.", + "Kohli, P.", + "Jumper, J.", + "Hassabis, D.", + ], + doi="10.1101/2021.10.04.463034", + ), + "location": "https://github.com/deepmind/alphafold", + "type": "package", + "version": config_data["af_version"], + }] + else: + step["software"] = [{ + "name": "AlphaFold", + "classification": "model building", + "description": "Structure prediction", + "citation": ihm.citations.alphafold2, + "location": "https://github.com/deepmind/alphafold", + "type": "package", + "version": config_data["af_version"], + }] + step["software_parameters"] = config_data["af_config"] + protocol.append(step) + + return protocol + + +def _get_title(fam_name): + """Get a title for this modelling experiment.""" + return f"AlphaFold model for NMPFamsDB Family {fam_name}" + + +def _get_model_details(fam_name): + """Get the model description.""" + db_url = f"https://bib.fleming.gr/NMPFamsDB/family?id={fam_name}" + # TODO: check if ok to use HTML for the URL + db_url = f"<a href=\"{db_url}\" target=\"_blank\">{db_url}</a>" + return f"Model generated using AlphaFold (v2.0.0) for the " \ + f"\"Representative Sequence\" of NMPFamsDB Metagenome / " \ + f"Metatranscriptome Family {fam_name}.\n\nSee {db_url} for " \ + f"additional details." + + +def _get_model_group_name(): + """Get a name for a model group.""" + return None + + +def _get_sequence(chn, use_auth=False): + """Get the sequence out of an OST chain incl. '-' for gaps in resnums.""" + # initialise (add gaps if first is not at num. 1) + lst_rn = _get_res_num(chn.residues[0], use_auth) + idx = 1 + sqe = "-" * (lst_rn - 1) \ + + chn.residues[0].one_letter_code + + for res in chn.residues[idx:]: + lst_rn += 1 + while lst_rn != _get_res_num(res, use_auth): + sqe += "-" + lst_rn += 1 + sqe += res.one_letter_code + return sqe + + +def _get_entities(pdb_file, ref_seq, fam_name): + """Gather data for the mmCIF (target) entities.""" + _check_file(pdb_file) + + ost_ent = io.LoadPDB(pdb_file) + if ost_ent.chain_count != 1: + raise RuntimeError( + f"Unexpected oligomer in {pdb_file}" + ) + chn = ost_ent.chains[0] + sqe_gaps = _get_sequence(chn) + + # NOTE: can have gaps to accommodate "X" in ref_seq + exp_seq = sqe_gaps.replace('-', 'X') + len_diff = len(ref_seq.string) - len(exp_seq) + if len_diff > 0: + exp_seq += 'X' * len_diff + if exp_seq != ref_seq.string: + raise RuntimeError(f"Sequence in {pdb_file} does not match ref_seq") + + # TODO: waiting for input on whether they change handling of "X" in seq + # -> HERE: assuming that "X" were in modelled sequence and PDB can have gaps + cif_ent = { + "seqres": ref_seq.string, # HACK for testing: .replace('X', 'A') + "pdb_sequence": sqe_gaps, + "pdb_chain_id": [_get_ch_name(chn, False)], + "fam_name": fam_name, + "description": f"Representative Sequence of NMPFamsDB Family {fam_name}" + } + + return [cif_ent], ost_ent + + +def _get_modelcif_entities(target_ents, asym_units, system): + """Create ModelCIF entities and asymmetric units.""" + for cif_ent in target_ents: + mdlcif_ent = modelcif.Entity( + # NOTE: sequence here defines residues in model! + cif_ent["seqres"], + description=cif_ent["description"], + source=None, + references=[ + _NmpfamsdbTrgRef( + cif_ent["fam_name"], + cif_ent["fam_name"], + align_begin=1, + align_end=len(cif_ent["seqres"]) + ) + ], + ) + # NOTE: this assigns (potentially new) alphabetic chain names + for pdb_chain_id in cif_ent["pdb_chain_id"]: + asym_units[pdb_chain_id] = modelcif.AsymUnit( + mdlcif_ent, strand_id=pdb_chain_id, + ) + system.target_entities.append(mdlcif_ent) + + +def _get_assoc_aln_file(fle_path): + """Generate a modelcif.associated.File object pointing to FASTA formatted + file containing MSA. + """ + cfile = modelcif.associated.File( + fle_path, + details="Custom MSA for modelling", + data=modelcif.data.Data("Custom MSA for modelling") + ) + cfile.file_format = "fasta" + cfile.file_content = "multiple sequence alignments" + return cfile + + +def _get_associated_files(mdl_name, arc_files): + """Create entry for associated files.""" + # package all into zip file + return modelcif.associated.Repository( + "", + [modelcif.associated.ZipFile(f"{mdl_name}.zip", files=arc_files)], + ) + # NOTE: by convention MA expects zip file with same name as model-cif + + +def _assemble_modelcif_software(soft_dict): + """Create a modelcif.Software instance from dictionary.""" + return modelcif.Software( + soft_dict["name"], + soft_dict["classification"], + soft_dict["description"], + soft_dict["location"], + soft_dict["type"], + soft_dict["version"], + citation=soft_dict["citation"], + ) + + +def _get_modelcif_protocol_software(js_step): + """Assemble software entries for a ModelCIF protocol step.""" + if js_step["software"]: + if len(js_step["software"]) == 1: + sftwre = _assemble_modelcif_software(js_step["software"][0]) + else: + sftwre = [] + for sft in js_step["software"]: + sftwre.append(_assemble_modelcif_software(sft)) + sftwre = modelcif.SoftwareGroup(elements=sftwre) + if js_step["software_parameters"]: + params = [] + for key, val in js_step["software_parameters"].items(): + params.append(modelcif.SoftwareParameter(key, val)) + if isinstance(sftwre, modelcif.SoftwareGroup): + sftwre.parameters = params + else: + sftwre = modelcif.SoftwareGroup( + elements=(sftwre,), parameters=params + ) + return sftwre + return None + + +def _get_modelcif_protocol_data(data_label, target_entities, aln_data, model): + """Assemble data for a ModelCIF protocol step.""" + if data_label == "target_sequences": + data = modelcif.data.DataGroup(target_entities) + elif data_label == "MSA": + data = aln_data + elif data_label == "target_sequences_and_MSA": + data = modelcif.data.DataGroup(target_entities) + data.append(aln_data) + elif data_label == "model": + data = model + else: + raise RuntimeError(f"Unknown protocol data: '{data_label}'") + return data + + +def _get_modelcif_protocol(protocol_steps, target_entities, aln_data, model): + """Create the protocol for the ModelCIF file.""" + protocol = modelcif.protocol.Protocol() + for js_step in protocol_steps: + sftwre = _get_modelcif_protocol_software(js_step) + input_data = _get_modelcif_protocol_data( + js_step["input"], target_entities, aln_data, model + ) + output_data = _get_modelcif_protocol_data( + js_step["output"], target_entities, aln_data, model + ) + + protocol.steps.append( + modelcif.protocol.Step( + input_data=input_data, + output_data=output_data, + name=js_step["name"], + details=js_step["details"], + software=sftwre, + ) + ) + protocol.steps[-1].method_type = js_step["method_type"] + return protocol + + +def _compress_cif_file(cif_file): + """Compress cif file and delete original.""" + with open(cif_file, 'rb') as f_in: + with gzip.open(cif_file + '.gz', 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(cif_file) + + +def _package_associated_files(repo): + """Compress associated files into single zip file and delete original.""" + # zip settings tested for good speed vs compression + for archive in repo.files: + with zipfile.ZipFile(archive.path, "w", zipfile.ZIP_BZIP2) as cif_zip: + for zfile in archive.files: + cif_zip.write(zfile.path, arcname=zfile.path) + os.remove(zfile.path) + + +def _store_as_modelcif(data_json, ost_ent, out_dir, mdl_name, compress): + """Mix all the data into a ModelCIF file.""" + print(" generating ModelCIF objects...", end="") + pstart = timer() + # create system to gather all the data + system = modelcif.System( + title=data_json["title"], + id=data_json["mdl_id"].upper(), + model_details=data_json["model_details"], + ) + + # create an asymmetric unit and an entity per target sequence + asym_units = {} + _get_modelcif_entities( + data_json["target_entities"], asym_units, system + ) + + # audit_authors + system.authors.extend(data_json["audit_authors"]) + + # set up the model to produce coordinates + model = _OST2ModelCIF( + assembly=modelcif.Assembly(asym_units.values()), + asym=asym_units, + ost_entity=ost_ent, + scores_json=data_json, + name=data_json["mdl_name"], + ) + print(f" ({timer()-pstart:.2f}s)") + print(" processing QA scores...", end="", flush=True) + pstart = timer() + model.add_scores() + print(f" ({timer()-pstart:.2f}s)") + + model_group = modelcif.model.ModelGroup( + [model], name=data_json["model_group_name"] + ) + system.model_groups.append(model_group) + + # handle additional files + aln_file = _get_assoc_aln_file(data_json["aln_file_name"]) + system.repositories.append( + _get_associated_files(mdl_name, [aln_file]) + ) + + # get data and steps + protocol = _get_modelcif_protocol( + data_json["protocol"], system.target_entities, aln_file.data, model, + ) + system.protocols.append(protocol) + + # write modelcif System to file (NOTE: no PAE here!) + print(" write to disk...", end="", flush=True) + pstart = timer() + # copy aln file to compress them + shutil.copyfile( + data_json["aln_file_path"], + os.path.join(out_dir, data_json["aln_file_name"]) + ) + # NOTE: we change path and back while being exception-safe to handle zipfile + oldpwd = os.getcwd() + os.chdir(out_dir) + mdl_fle = f"{mdl_name}.cif" + try: + with open(mdl_fle, "w", encoding="ascii") as mmcif_fh: + modelcif.dumper.write(mmcif_fh, [system]) + _package_associated_files(system.repositories[0]) + if compress: + _compress_cif_file(mdl_fle) + mdl_fle += ".gz" + finally: + os.chdir(oldpwd) + print(f" ({timer()-pstart:.2f}s)") + + +def _translate2modelcif(f_name, opts, metadata_fam, pdb_files, ref_seq_check): + """Convert a model with its accompanying data to ModelCIF.""" + + # TODO: unclear what to do with such cases; skipped for now + if len(metadata_fam) != 5: + _warn_msg(f"Unexpected number of {len(metadata_fam)} models in " \ + f"metadata for family {f_name}. Skipping...") + return + # + + mdl_id = f_name + # skip if done already + if opts.compress: + cifext = "cif.gz" + else: + cifext = "cif" + mdl_path = os.path.join(opts.out_dir, f"{mdl_id}.{cifext}") + if os.path.exists(mdl_path): + print(f" {mdl_id} already done...") + return + + # go for it... + print(f" translating {mdl_id}...") + pdb_start = timer() + + # get aln_data and ref. seq. for this entry + aln_file = f"{f_name}.fasta" + aln_path = os.path.join(opts.msa_data_dir, aln_file) + # TODO: if we need to handle files without ALN, this needs fixing + # -> e.g. 11 extra models in pub_data_* cannot be handled right now + if not os.path.exists(aln_path): + _warn_msg(f"Cannot deal with missing MSA for {f_name} (yet). " \ + f"Skipping...") + return + # + aln = io.LoadAlignment(aln_path) # note: this checks that it's an actual MSA + ref_seq = aln.sequences[0] + if ref_seq_check is not None and ref_seq_check.string != ref_seq.string: + raise RuntimeError(f"Sequence mismatch for {f_name}") + # TODO: allow "X" (or whatever can be used to label unknown AA) if needed + if "X" in ref_seq.string: + _warn_msg(f"Cannot deal with 'X' in ref_seq for {f_name} (yet). " \ + f"Skipping...") + return + # + + # gather data into JSON-like structure + print(" preparing data...", end="") + pstart = timer() + + config_data = _get_config() + mdlcf_json = {} + mdlcf_json["audit_authors"] = _get_audit_authors() + mdlcf_json["protocol"] = _get_protocol_steps_and_software(config_data) + mdlcf_json["config_data"] = config_data + mdlcf_json["mdl_id"] = mdl_id + mdlcf_json["aln_file_name"] = aln_file + mdlcf_json["aln_file_path"] = aln_path + + # find model to process + # TODO: here just top pLDDT model processed; extend for more if needed... + top_metadata = metadata_fam.loc[metadata_fam.pLDDT.idxmax()] + pdb_list_sel = [f for f in pdb_files if top_metadata.mdl in f] + if len(pdb_list_sel) != 1: + # this should only happen if duplicated file in pub_data_* + # TODO: for now no warning shown and we just pick first hit + # -> first hit is from lowest "pub_data_*" + # -> unclear if we should worry about it... + pass + mdlcf_json["mdl_name"] = f"Top ranked model ({top_metadata.mdl})" + + # get scores for this entry + mdlcf_json["plddt_global"] = top_metadata.pLDDT + mdlcf_json["ptm"] = top_metadata.pTM + + # process coordinates + pdb_file = pdb_list_sel[0] + target_entities, ost_ent = _get_entities(pdb_file, ref_seq, f_name) + mdlcf_json["target_entities"] = target_entities + # sanity check (only for top ranked model!) + if opts.pdb_web_path is not None: + pdb_file_web = os.path.join(opts.pdb_web_path, f"{f_name}.pdb") + if not filecmp.cmp(pdb_file, pdb_file_web): + raise RuntimeError(f"PDB file mismatch web vs top-ranked for " \ + f"{f_name}") + + # fill annotations + mdlcf_json["title"] = _get_title(f_name) + mdlcf_json["model_details"] = _get_model_details(f_name) + mdlcf_json["model_group_name"] = _get_model_group_name() + print(f" ({timer()-pstart:.2f}s)") + + # save ModelCIF + _store_as_modelcif(mdlcf_json, ost_ent, opts.out_dir, mdl_id, opts.compress) + + # check if result can be read and has expected seq. + ent, ss = io.LoadMMCIF(mdl_path, seqres=True) + exp_seqs = [trg_ent["pdb_sequence"] \ + for trg_ent in mdlcf_json["target_entities"]] + assert ent.chain_count == len(exp_seqs), f"Bad chain count {mdl_id}" + # here we expect auth = label IDs + ent_seq = "".join([_get_sequence(chn, False) for chn in ent.chains]) + ent_seq_a = "".join([_get_sequence(chn, True) for chn in ent.chains]) + assert ent_seq == ent_seq_a + assert ent_seq == "".join(exp_seqs), f"Bad seq. {mdl_id}" + ent_seqres = "".join([ss.FindSequence(chn.name).string \ + for chn in ent.chains]) + exp_seqres = "".join([trg_ent["seqres"] \ + for trg_ent in mdlcf_json["target_entities"]]) + assert ent_seqres == exp_seqres, f"Bad seqres {mdl_id}" + + print(f" ... done with {mdl_id} ({timer()-pdb_start:.2f}s).") + + +def _main(): + """Run as script.""" + opts = _parse_args() + + # parse/fetch global data + metadata_full = _get_metadata(opts.metadata_file) + pdb_files_split = _get_pdb_files(opts.model_base_dir) + if opts.refseq_path is not None: + refseqs = io.LoadSequenceList(opts.refseq_path) + else: + refseqs = None + + # get on with models + print(f"Working on {opts.prefix}*...") + + # iterate over models + for f_name in sorted(pdb_files_split): + if f_name.startswith(opts.prefix): + _translate2modelcif( + f_name, opts, + metadata_full[metadata_full.ID == f_name], + pdb_files_split[f_name], + refseqs.FindSequence(f_name) if refseqs is not None else None, + ) + + print(f"... done with {opts.prefix}*.") + + +if __name__ == "__main__": + _main()