#! /usr/local/bin/ost # -*- coding: utf-8 -*- """Translate models for Sergey from PDB + extra data into ModelCIF.""" # EXAMPLES for running: """ ost translate2modelcif.py ./raw_data ./raw_data/ptm_plddt.all.txt \ ./web_dloads/pivot ./modelcif --prefix=F000347 \ --pdb-web-path=./web_dloads/pdb \ --refseq-path=./web_dloads/consensus_all.fasta """ # NOTE: add "--compress" for final runs from timeit import default_timer as timer import argparse import filecmp import gzip import os import pandas as pd import shutil import sys import zipfile import ihm import ihm.citations import modelcif import modelcif.associated import modelcif.dumper import modelcif.model import modelcif.protocol import modelcif.reference from ost import io def _abort_msg(msg, exit_code=1): """Write error message and exit with exit_code.""" print(f"{msg}\nAborting.", file=sys.stderr) sys.exit(exit_code) def _warn_msg(msg): """Write a warning message to stdout.""" print(f"WARNING: {msg}") def _check_file(file_path): """Make sure a file exists and is actually a file.""" if not os.path.exists(file_path): _abort_msg(f"File not found: '{file_path}'.") if not os.path.isfile(file_path): _abort_msg(f"File path does not point to file: '{file_path}'.") def _check_folder(dir_path): """Make sure a file exists and is actually a file.""" if not os.path.exists(dir_path): _abort_msg(f"Path not found: '{dir_path}'.") if not os.path.isdir(dir_path): _abort_msg(f"Path does not point to a directory: '{dir_path}'.") def _check_opts_folder(dir_path): """Remove trailing '/' (return fixed one) and check if path valid.""" if dir_path.endswith("/"): dir_path = dir_path[:-1] _check_folder(dir_path) return dir_path def _parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__, ) parser.add_argument( "model_base_dir", type=str, metavar="<MODEL BASE DIR>", help="Directory with pub_data* directories with model PDBs.", ) parser.add_argument( "metadata_file", type=str, metavar="<METADATA FILE>", help="Path to table with metadata.", ) parser.add_argument( "msa_data_dir", type=str, metavar="<MSA DIR>", help="Directory with F*.fasta files for custom MSAs.", ) parser.add_argument( "out_dir", type=str, metavar="<OUTPUT DIR>", help="Path to directory to store results.", ) parser.add_argument( "--prefix", type=str, metavar="<PREFIX>", default="", help="Only process families starting with given prefix. By default " \ + "all families are processed.", ) parser.add_argument( "--compress", default=False, action="store_true", help="Compress ModelCIF file with gzip " "(note that acc. data is zipped either way).", ) parser.add_argument( "--pdb-web-path", type=str, metavar="<PDB WEB PATH>", default=None, help="Optional path to directory with F*.pdb files as available on " \ + "NMPFamsDB web site. Used to check if top ranked model the same.", ) parser.add_argument( "--refseq-path", type=str, metavar="<PDB REFSEQ PATH>", default=None, help="Optional path to fasta file with all ref. seq. as available on " \ + "NMPFamsDB web site. Used to check if it matches with MSA.", ) opts = parser.parse_args() # check input opts.model_base_dir = _check_opts_folder(opts.model_base_dir) _check_file(opts.metadata_file) opts.msa_data_dir = _check_opts_folder(opts.msa_data_dir) # check out_dir if opts.out_dir.endswith("/"): opts.out_dir = opts.out_dir[:-1] if not os.path.exists(opts.out_dir): os.makedirs(opts.out_dir) if not os.path.isdir(opts.out_dir): _abort_msg(f"Path '{opts.out_dir}' does not point to a directory.") # check optional paths if opts.pdb_web_path is not None: opts.pdb_web_path = _check_opts_folder(opts.pdb_web_path) if opts.refseq_path is not None: _check_file(opts.refseq_path) return opts # pylint: disable=too-few-public-methods class _GlobalPTM(modelcif.qa_metric.Global, modelcif.qa_metric.PTM): """Predicted accuracy according to the TM-score score in [0,1]""" name = "pTM" software = None class _GlobalPLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT): """Predicted accuracy according to the CA-only lDDT in [0,100]""" name = "pLDDT" software = None class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT): """Predicted accuracy according to the CA-only lDDT in [0,100]""" name = "pLDDT" software = None class _NmpfamsdbTrgRef(modelcif.reference.TargetReference): """PDB as target reference.""" name = "Other" other_details = "NMPFamsDB" # pylint: enable=too-few-public-methods def _get_res_num(r, use_auth=False): """Get res. num. from auth. IDs if reading from mmCIF files.""" if use_auth: return int(r.GetStringProp("pdb_auth_resnum")) else: return r.number.num def _get_ch_name(ch, use_auth=False): """Get chain name from auth. IDs if reading from mmCIF files.""" if use_auth: return ch.GetStringProp("pdb_auth_chain_name") else: return ch.name class _OST2ModelCIF(modelcif.model.AbInitioModel): """Map OST entity elements to ihm.model""" def __init__(self, *args, **kwargs): """Initialise a model""" for i in ["ost_entity", "asym", "scores_json"]: if i not in kwargs: raise TypeError(f"Required keyword argument '{i}' not found.") self.ost_entity = kwargs.pop("ost_entity") self.asym = kwargs.pop("asym") self.scores_json = kwargs.pop("scores_json") # use auth IDs for res. nums and chain names self.use_auth = False # fetch plddts per residue self.plddts = [] for res in self.ost_entity.residues: b_factors = [a.b_factor for a in res.atoms] assert len(set(b_factors)) == 1 # must all be equal! self.plddts.append(b_factors[0]) super().__init__(*args, **kwargs) def get_atoms(self): # ToDo [internal]: Take B-factor out since its not a B-factor? # NOTE: this assumes that _get_res_num maps residue to pos. in seqres # within asym for atm in self.ost_entity.atoms: yield modelcif.model.Atom( asym_unit=self.asym[_get_ch_name(atm.chain, self.use_auth)], seq_id=_get_res_num(atm.residue, self.use_auth), atom_id=atm.name, type_symbol=atm.element, x=atm.pos[0], y=atm.pos[1], z=atm.pos[2], het=atm.is_hetatom, biso=atm.b_factor, occupancy=atm.occupancy, ) def add_scores(self): """Add QA metrics from AF2 scores.""" # global scores self.qa_metrics.extend( ( _GlobalPLDDT(self.scores_json["plddt_global"]), _GlobalPTM(self.scores_json["ptm"]), ) ) # local scores i = 0 for chn_i in self.ost_entity.chains: ch_name = _get_ch_name(chn_i, self.use_auth) for res_i in chn_i.residues: # local pLDDT res_num = _get_res_num(res_i, self.use_auth) self.qa_metrics.append( _LocalPLDDT( self.asym[ch_name].residue(res_num), self.plddts[i], ) ) i += 1 def _get_audit_authors(): """Return the list of authors that produced this model.""" return ( "Pavlopoulos, Georgios A.", "Baltoumas, Fotis A.", "Liu, Sirui", "Selvitopi, Oguz", "Camargo, Antonio Pedro", "Nayfach, Stephen", "Azad, Ariful", "Roux, Simon", "Call, Lee", "Ivanova, Natalia N.", "Chen, I-Min", "Paez-Espino, David", "Karatzas, Evangelos", "Novel Metagenome Protein Families Consortium", "Iliopoulos, Ioannis", "Konstantinidi, Konstantinos", "Tiedje, James M.", "Pett-Ridge, Jennifer", "Baker, David", "Visel, Axel", "Ouzounis, Christos A.", "Ovchinnikov, Sergey", "Buluc, Aydin", "Kyrpides, Nikos C.", ) def _get_metadata(metadata_file): """Read csv file with metedata and prepare for next steps.""" metadata = pd.read_csv(metadata_file, sep=' ', names=["ID", "mdl", "pTM", "pLDDT"]) return metadata def _get_pdb_files(model_base_dir): """Collect PDB files from pub_data_* folders. Returns dict with key = family name and value = list of paths to PDB files. """ pdb_files_split = dict() # to return pdb_files_raw = set() # to check for duplicates pub_paths = [f for f in os.listdir(model_base_dir) \ if f.startswith("pub_data_")] # NOTE: we sort pub_paths to ensure that pub_data_02 is before _03 for pub_path in sorted(pub_paths): sub_path = os.path.join(model_base_dir, pub_path) pdb_files_new = [f for f in os.listdir(sub_path) \ if not f.startswith(".")] for f in pdb_files_new: f_path = os.path.join(sub_path, f) f_name = f.split('_')[0] if f_name in pdb_files_split: pdb_files_split[f_name].append(f_path) else: pdb_files_split[f_name] = [f_path] # check global list pdb_files_new_set = set(pdb_files_new) new_duplicates = pdb_files_raw.intersection(pdb_files_new_set) if new_duplicates: _warn_msg(f"{len(new_duplicates)} duplicated files found in " \ f"{sub_path}.") pdb_files_raw = pdb_files_raw.union(pdb_files_new_set) return pdb_files_split def _get_config(): """Define AF setup.""" msa_description = "MSA created by calculating the central or \"pivot\" " \ "sequence of each seed MSA, and refining each " \ "alignment using that sequence as the guide." mdl_description = "Model generated using AlphaFold (v2.0.0 with models " \ "finetuned to return ptm weights) producing 5 models, " \ "without model relaxation, without templates, ranked " \ "by pLDDT, starting from a custom MSA." af_config = {} return { "af_config": af_config, "af_version": "2.0.0", "mdl_description": mdl_description, "msa_description": msa_description, "use_templates": False, "use_small_bfd": False, "use_multimer": False, } def _get_protocol_steps_and_software(config_data): """Create the list of protocol steps with software and parameters used.""" protocol = [] # MSA step step = { "method_type": "coevolution MSA", "name": None, "details": config_data["msa_description"], } step["input"] = "target_sequences" step["output"] = "MSA" step["software"] = [] step["software_parameters"] = {} protocol.append(step) # modelling step step = { "method_type": "modeling", "name": None, "details": config_data["mdl_description"], } # get input data # Must refer to data already in the JSON, so we try keywords step["input"] = "target_sequences_and_MSA" # get output data # Must refer to existing data, so we try keywords step["output"] = "model" # get software if config_data["use_multimer"]: step["software"] = [{ "name": "AlphaFold-Multimer", "classification": "model building", "description": "Structure prediction", "citation": ihm.Citation( pmid=None, title="Protein complex prediction with " + "AlphaFold-Multimer.", journal="bioRxiv", volume=None, page_range=None, year=2021, authors=[ "Evans, R.", "O'Neill, M.", "Pritzel, A.", "Antropova, N.", "Senior, A.", "Green, T.", "Zidek, A.", "Bates, R.", "Blackwell, S.", "Yim, J.", "Ronneberger, O.", "Bodenstein, S.", "Zielinski, M.", "Bridgland, A.", "Potapenko, A.", "Cowie, A.", "Tunyasuvunakool, K.", "Jain, R.", "Clancy, E.", "Kohli, P.", "Jumper, J.", "Hassabis, D.", ], doi="10.1101/2021.10.04.463034", ), "location": "https://github.com/deepmind/alphafold", "type": "package", "version": config_data["af_version"], }] else: step["software"] = [{ "name": "AlphaFold", "classification": "model building", "description": "Structure prediction", "citation": ihm.citations.alphafold2, "location": "https://github.com/deepmind/alphafold", "type": "package", "version": config_data["af_version"], }] step["software_parameters"] = config_data["af_config"] protocol.append(step) return protocol def _get_title(fam_name): """Get a title for this modelling experiment.""" return f"AlphaFold model for NMPFamsDB Family {fam_name}" def _get_model_details(fam_name): """Get the model description.""" db_url = f"https://bib.fleming.gr/NMPFamsDB/family?id={fam_name}" # TODO: check if ok to use HTML for the URL db_url = f"<a href=\"{db_url}\" target=\"_blank\">{db_url}</a>" return f"Model generated using AlphaFold (v2.0.0) for the " \ f"\"Representative Sequence\" of NMPFamsDB Metagenome / " \ f"Metatranscriptome Family {fam_name}.\n\nSee {db_url} for " \ f"additional details." def _get_model_group_name(): """Get a name for a model group.""" return None def _get_sequence(chn, use_auth=False): """Get the sequence out of an OST chain incl. '-' for gaps in resnums.""" # initialise (add gaps if first is not at num. 1) lst_rn = _get_res_num(chn.residues[0], use_auth) idx = 1 sqe = "-" * (lst_rn - 1) \ + chn.residues[0].one_letter_code for res in chn.residues[idx:]: lst_rn += 1 while lst_rn != _get_res_num(res, use_auth): sqe += "-" lst_rn += 1 sqe += res.one_letter_code return sqe def _get_entities(pdb_file, ref_seq, fam_name): """Gather data for the mmCIF (target) entities.""" _check_file(pdb_file) ost_ent = io.LoadPDB(pdb_file) if ost_ent.chain_count != 1: raise RuntimeError( f"Unexpected oligomer in {pdb_file}" ) chn = ost_ent.chains[0] sqe_gaps = _get_sequence(chn) # NOTE: can have gaps to accommodate "X" in ref_seq exp_seq = sqe_gaps.replace('-', 'X') len_diff = len(ref_seq.string) - len(exp_seq) if len_diff > 0: exp_seq += 'X' * len_diff if exp_seq != ref_seq.string: raise RuntimeError(f"Sequence in {pdb_file} does not match ref_seq") # TODO: waiting for input on whether they change handling of "X" in seq # -> HERE: assuming that "X" were in modelled sequence and PDB can have gaps cif_ent = { "seqres": ref_seq.string, # HACK for testing: .replace('X', 'A') "pdb_sequence": sqe_gaps, "pdb_chain_id": [_get_ch_name(chn, False)], "fam_name": fam_name, "description": f"Representative Sequence of NMPFamsDB Family {fam_name}" } return [cif_ent], ost_ent def _get_modelcif_entities(target_ents, asym_units, system): """Create ModelCIF entities and asymmetric units.""" for cif_ent in target_ents: mdlcif_ent = modelcif.Entity( # NOTE: sequence here defines residues in model! cif_ent["seqres"], description=cif_ent["description"], source=None, references=[ _NmpfamsdbTrgRef( cif_ent["fam_name"], cif_ent["fam_name"], align_begin=1, align_end=len(cif_ent["seqres"]) ) ], ) # NOTE: this assigns (potentially new) alphabetic chain names for pdb_chain_id in cif_ent["pdb_chain_id"]: asym_units[pdb_chain_id] = modelcif.AsymUnit( mdlcif_ent, strand_id=pdb_chain_id, ) system.target_entities.append(mdlcif_ent) def _get_assoc_aln_file(fle_path): """Generate a modelcif.associated.File object pointing to FASTA formatted file containing MSA. """ cfile = modelcif.associated.File( fle_path, details="Custom MSA for modelling", data=modelcif.data.Data("Custom MSA for modelling") ) cfile.file_format = "fasta" cfile.file_content = "multiple sequence alignments" return cfile def _get_associated_files(mdl_name, arc_files): """Create entry for associated files.""" # package all into zip file return modelcif.associated.Repository( "", [modelcif.associated.ZipFile(f"{mdl_name}.zip", files=arc_files)], ) # NOTE: by convention MA expects zip file with same name as model-cif def _assemble_modelcif_software(soft_dict): """Create a modelcif.Software instance from dictionary.""" return modelcif.Software( soft_dict["name"], soft_dict["classification"], soft_dict["description"], soft_dict["location"], soft_dict["type"], soft_dict["version"], citation=soft_dict["citation"], ) def _get_modelcif_protocol_software(js_step): """Assemble software entries for a ModelCIF protocol step.""" if js_step["software"]: if len(js_step["software"]) == 1: sftwre = _assemble_modelcif_software(js_step["software"][0]) else: sftwre = [] for sft in js_step["software"]: sftwre.append(_assemble_modelcif_software(sft)) sftwre = modelcif.SoftwareGroup(elements=sftwre) if js_step["software_parameters"]: params = [] for key, val in js_step["software_parameters"].items(): params.append(modelcif.SoftwareParameter(key, val)) if isinstance(sftwre, modelcif.SoftwareGroup): sftwre.parameters = params else: sftwre = modelcif.SoftwareGroup( elements=(sftwre,), parameters=params ) return sftwre return None def _get_modelcif_protocol_data(data_label, target_entities, aln_data, model): """Assemble data for a ModelCIF protocol step.""" if data_label == "target_sequences": data = modelcif.data.DataGroup(target_entities) elif data_label == "MSA": data = aln_data elif data_label == "target_sequences_and_MSA": data = modelcif.data.DataGroup(target_entities) data.append(aln_data) elif data_label == "model": data = model else: raise RuntimeError(f"Unknown protocol data: '{data_label}'") return data def _get_modelcif_protocol(protocol_steps, target_entities, aln_data, model): """Create the protocol for the ModelCIF file.""" protocol = modelcif.protocol.Protocol() for js_step in protocol_steps: sftwre = _get_modelcif_protocol_software(js_step) input_data = _get_modelcif_protocol_data( js_step["input"], target_entities, aln_data, model ) output_data = _get_modelcif_protocol_data( js_step["output"], target_entities, aln_data, model ) protocol.steps.append( modelcif.protocol.Step( input_data=input_data, output_data=output_data, name=js_step["name"], details=js_step["details"], software=sftwre, ) ) protocol.steps[-1].method_type = js_step["method_type"] return protocol def _compress_cif_file(cif_file): """Compress cif file and delete original.""" with open(cif_file, 'rb') as f_in: with gzip.open(cif_file + '.gz', 'wb') as f_out: shutil.copyfileobj(f_in, f_out) os.remove(cif_file) def _package_associated_files(repo): """Compress associated files into single zip file and delete original.""" # zip settings tested for good speed vs compression for archive in repo.files: with zipfile.ZipFile(archive.path, "w", zipfile.ZIP_BZIP2) as cif_zip: for zfile in archive.files: cif_zip.write(zfile.path, arcname=zfile.path) os.remove(zfile.path) def _store_as_modelcif(data_json, ost_ent, out_dir, mdl_name, compress): """Mix all the data into a ModelCIF file.""" print(" generating ModelCIF objects...", end="") pstart = timer() # create system to gather all the data system = modelcif.System( title=data_json["title"], id=data_json["mdl_id"].upper(), model_details=data_json["model_details"], ) # create an asymmetric unit and an entity per target sequence asym_units = {} _get_modelcif_entities( data_json["target_entities"], asym_units, system ) # audit_authors system.authors.extend(data_json["audit_authors"]) # set up the model to produce coordinates model = _OST2ModelCIF( assembly=modelcif.Assembly(asym_units.values()), asym=asym_units, ost_entity=ost_ent, scores_json=data_json, name=data_json["mdl_name"], ) print(f" ({timer()-pstart:.2f}s)") print(" processing QA scores...", end="", flush=True) pstart = timer() model.add_scores() print(f" ({timer()-pstart:.2f}s)") model_group = modelcif.model.ModelGroup( [model], name=data_json["model_group_name"] ) system.model_groups.append(model_group) # handle additional files aln_file = _get_assoc_aln_file(data_json["aln_file_name"]) system.repositories.append( _get_associated_files(mdl_name, [aln_file]) ) # get data and steps protocol = _get_modelcif_protocol( data_json["protocol"], system.target_entities, aln_file.data, model, ) system.protocols.append(protocol) # write modelcif System to file (NOTE: no PAE here!) print(" write to disk...", end="", flush=True) pstart = timer() # copy aln file to compress them shutil.copyfile( data_json["aln_file_path"], os.path.join(out_dir, data_json["aln_file_name"]) ) # NOTE: we change path and back while being exception-safe to handle zipfile oldpwd = os.getcwd() os.chdir(out_dir) mdl_fle = f"{mdl_name}.cif" try: with open(mdl_fle, "w", encoding="ascii") as mmcif_fh: modelcif.dumper.write(mmcif_fh, [system]) _package_associated_files(system.repositories[0]) if compress: _compress_cif_file(mdl_fle) mdl_fle += ".gz" finally: os.chdir(oldpwd) print(f" ({timer()-pstart:.2f}s)") def _translate2modelcif(f_name, opts, metadata_fam, pdb_files, ref_seq_check): """Convert a model with its accompanying data to ModelCIF.""" # TODO: unclear what to do with such cases; skipped for now if len(metadata_fam) != 5: _warn_msg(f"Unexpected number of {len(metadata_fam)} models in " \ f"metadata for family {f_name}. Skipping...") return # mdl_id = f_name # skip if done already if opts.compress: cifext = "cif.gz" else: cifext = "cif" mdl_path = os.path.join(opts.out_dir, f"{mdl_id}.{cifext}") if os.path.exists(mdl_path): print(f" {mdl_id} already done...") return # go for it... print(f" translating {mdl_id}...") pdb_start = timer() # get aln_data and ref. seq. for this entry aln_file = f"{f_name}.fasta" aln_path = os.path.join(opts.msa_data_dir, aln_file) # TODO: if we need to handle files without ALN, this needs fixing # -> e.g. 11 extra models in pub_data_* cannot be handled right now if not os.path.exists(aln_path): _warn_msg(f"Cannot deal with missing MSA for {f_name} (yet). " \ f"Skipping...") return # aln = io.LoadAlignment(aln_path) # note: this checks that it's an actual MSA ref_seq = aln.sequences[0] if ref_seq_check is not None and ref_seq_check.string != ref_seq.string: raise RuntimeError(f"Sequence mismatch for {f_name}") # TODO: allow "X" (or whatever can be used to label unknown AA) if needed if "X" in ref_seq.string: _warn_msg(f"Cannot deal with 'X' in ref_seq for {f_name} (yet). " \ f"Skipping...") return # # gather data into JSON-like structure print(" preparing data...", end="") pstart = timer() config_data = _get_config() mdlcf_json = {} mdlcf_json["audit_authors"] = _get_audit_authors() mdlcf_json["protocol"] = _get_protocol_steps_and_software(config_data) mdlcf_json["config_data"] = config_data mdlcf_json["mdl_id"] = mdl_id mdlcf_json["aln_file_name"] = aln_file mdlcf_json["aln_file_path"] = aln_path # find model to process # TODO: here just top pLDDT model processed; extend for more if needed... top_metadata = metadata_fam.loc[metadata_fam.pLDDT.idxmax()] pdb_list_sel = [f for f in pdb_files if top_metadata.mdl in f] if len(pdb_list_sel) != 1: # this should only happen if duplicated file in pub_data_* # TODO: for now no warning shown and we just pick first hit # -> first hit is from lowest "pub_data_*" # -> unclear if we should worry about it... pass mdlcf_json["mdl_name"] = f"Top ranked model ({top_metadata.mdl})" # get scores for this entry mdlcf_json["plddt_global"] = top_metadata.pLDDT mdlcf_json["ptm"] = top_metadata.pTM # process coordinates pdb_file = pdb_list_sel[0] target_entities, ost_ent = _get_entities(pdb_file, ref_seq, f_name) mdlcf_json["target_entities"] = target_entities # sanity check (only for top ranked model!) if opts.pdb_web_path is not None: pdb_file_web = os.path.join(opts.pdb_web_path, f"{f_name}.pdb") if not filecmp.cmp(pdb_file, pdb_file_web): raise RuntimeError(f"PDB file mismatch web vs top-ranked for " \ f"{f_name}") # fill annotations mdlcf_json["title"] = _get_title(f_name) mdlcf_json["model_details"] = _get_model_details(f_name) mdlcf_json["model_group_name"] = _get_model_group_name() print(f" ({timer()-pstart:.2f}s)") # save ModelCIF _store_as_modelcif(mdlcf_json, ost_ent, opts.out_dir, mdl_id, opts.compress) # check if result can be read and has expected seq. ent, ss = io.LoadMMCIF(mdl_path, seqres=True) exp_seqs = [trg_ent["pdb_sequence"] \ for trg_ent in mdlcf_json["target_entities"]] assert ent.chain_count == len(exp_seqs), f"Bad chain count {mdl_id}" # here we expect auth = label IDs ent_seq = "".join([_get_sequence(chn, False) for chn in ent.chains]) ent_seq_a = "".join([_get_sequence(chn, True) for chn in ent.chains]) assert ent_seq == ent_seq_a assert ent_seq == "".join(exp_seqs), f"Bad seq. {mdl_id}" ent_seqres = "".join([ss.FindSequence(chn.name).string \ for chn in ent.chains]) exp_seqres = "".join([trg_ent["seqres"] \ for trg_ent in mdlcf_json["target_entities"]]) assert ent_seqres == exp_seqres, f"Bad seqres {mdl_id}" print(f" ... done with {mdl_id} ({timer()-pdb_start:.2f}s).") def _main(): """Run as script.""" opts = _parse_args() # parse/fetch global data metadata_full = _get_metadata(opts.metadata_file) pdb_files_split = _get_pdb_files(opts.model_base_dir) if opts.refseq_path is not None: refseqs = io.LoadSequenceList(opts.refseq_path) else: refseqs = None # get on with models print(f"Working on {opts.prefix}*...") # iterate over models for f_name in sorted(pdb_files_split): if f_name.startswith(opts.prefix): _translate2modelcif( f_name, opts, metadata_full[metadata_full.ID == f_name], pdb_files_split[f_name], refseqs.FindSequence(f_name) if refseqs is not None else None, ) print(f"... done with {opts.prefix}*.") if __name__ == "__main__": _main()