#! /usr/local/bin/ost """Translate models from Niko from PDB + extra data into ModelCIF.""" # EXAMPLES for running: """ GT test setup: ost scripts/translate2modelcif.py "./PKG Cut" "./PKG/config.json" \ "./PKG/model_archive_metadata.csv" \ "./PKG/Spongilla_lacustris_translated_proteome_fixed.fasta" \ --out_dir="./modelcif" Setup on work box for full translation: ost scripts/translate2modelcif.py "./PKG-all" "./PKG/config.json" \ "./PKG/model_archive_metadata.csv" \ "./PKG/Spongilla_lacustris_translated_proteome_fixed.fasta" \ --out_dir="./modelcif" --compress """ import argparse import datetime import os import sys import gzip, shutil, zipfile from timeit import default_timer as timer import numpy as np import requests import ujson as json import pandas as pd import ihm import ihm.citations import modelcif import modelcif.associated import modelcif.dumper import modelcif.model import modelcif.protocol import modelcif.reference from ost import io def _parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__, ) parser.add_argument( "model_dir", type=str, metavar="<MODEL DIR>", help="Directory with model(s) to be translated.", ) parser.add_argument( "config_file", type=str, metavar="<CONFIG FILE>", help="Path to JSON file with ColabFold config.", ) parser.add_argument( "metadata_file", type=str, metavar="<METADATA FILE>", help="Path to CSV file with metadata.", ) parser.add_argument( "fasta_file", type=str, metavar="<FASTA FILE>", help="Path to FASTA file with sequences.", ) parser.add_argument( "--out_dir", type=str, metavar="<OUTPUT DIR>", default="", help="Path to separate path to store results " \ "(model_dir used, if none given).", ) parser.add_argument( "--compress", default=False, action="store_true", help="Compress ModelCIF file with gzip " \ "(note that QA file is zipped either way).", ) opts = parser.parse_args() # check that model dir exists if opts.model_dir.endswith("/"): opts.model_dir = opts.model_dir[:-1] if not os.path.exists(opts.model_dir): _abort_msg(f"Model directory '{opts.model_dir}' does not exist.") if not os.path.isdir(opts.model_dir): _abort_msg(f"Path '{opts.model_dir}' does not point to a directory.") # check config_file if not os.path.exists(opts.config_file): _abort_msg(f"Config file '{opts.config_file}' does not exist.") if not os.path.isfile(opts.config_file): _abort_msg(f"Path '{opts.config_file}' does not point to a file.") # check metadata_file if not os.path.exists(opts.metadata_file): _abort_msg(f"Metadata file '{opts.metadata_file}' does not exist.") if not os.path.isfile(opts.metadata_file): _abort_msg(f"Path '{opts.metadata_file}' does not point to a file.") # check fasta_file if not os.path.exists(opts.fasta_file): _abort_msg(f"FASTA file '{opts.fasta_file}' does not exist.") if not os.path.isfile(opts.fasta_file): _abort_msg(f"Path '{opts.fasta_file}' does not point to a file.") # check out_dir if not opts.out_dir: opts.out_dir = opts.model_dir else: if not os.path.exists(opts.out_dir): _abort_msg(f"Output directory '{opts.out_dir}' does not exist.") if not os.path.isdir(opts.out_dir): _abort_msg(f"Path '{opts.out_dir}' does not point to a directory.") return opts # pylint: disable=too-few-public-methods class _GlobalPTM(modelcif.qa_metric.Global, modelcif.qa_metric.PTM): """Predicted accuracy according to the TM-score score in [0,1]""" name = "pTM" software = None class _GlobalPLDDT(modelcif.qa_metric.Global, modelcif.qa_metric.PLDDT): """Predicted accuracy according to the CA-only lDDT in [0,100]""" name = "pLDDT" software = None class _LocalPLDDT(modelcif.qa_metric.Local, modelcif.qa_metric.PLDDT): """Predicted accuracy according to the CA-only lDDT in [0,100]""" name = "pLDDT" software = None class _PAE(modelcif.qa_metric.MetricType): """Predicted aligned error (in Angstroms)""" type = "PAE" other_details = None class _LocalPairwisePAE(modelcif.qa_metric.LocalPairwise, _PAE): """Predicted aligned error (in Angstroms)""" name = "PAE" software = None # pylint: enable=too-few-public-methods class _OST2ModelCIF(modelcif.model.AbInitioModel): """Map OST entity elements to ihm.model""" def __init__(self, *args, **kwargs): """Initialise a model""" self.ost_entity = kwargs.pop("ost_entity") self.asym = kwargs.pop("asym") super().__init__(*args, **kwargs) def get_atoms(self): # ToDo [internal]: Take B-factor out since its not a B-factor? for atm in self.ost_entity.atoms: yield modelcif.model.Atom( asym_unit=self.asym[atm.chain.name], seq_id=atm.residue.number.num, atom_id=atm.name, type_symbol=atm.element, x=atm.pos[0], y=atm.pos[1], z=atm.pos[2], het=atm.is_hetatom, biso=atm.b_factor, occupancy=atm.occupancy, ) def add_scores(self, scores_json, entry_id, mdl_name): """Add QA metrics from AF2 scores.""" # global scores self.qa_metrics.extend( ( _GlobalPLDDT(np.mean(scores_json["plddt"])), _GlobalPTM(scores_json["ptm"]), ) ) # local scores lpae = [] i = 0 for chn_i in self.ost_entity.chains: for res_i in chn_i.residues: # local pLDDT self.qa_metrics.append( _LocalPLDDT( self.asym[chn_i.name].residue(res_i.number.num), scores_json["plddt"][i], ) ) # pairwise alignment error j = 0 # We do a 2nd iteration over the OST entity above doing index # magic because it keeps the code cleaner and is only around # 0.3s slower than iterating the array directly. Majority of # time goes into writing files, anyway. for chn_j in self.ost_entity.chains: for res_j in chn_j.residues: lpae.append( _LocalPairwisePAE( self.asym[chn_i.name].residue(res_i.number.num), self.asym[chn_j.name].residue(res_j.number.num), scores_json["pae"][i][j], ) ) j += 1 i += 1 self.qa_metrics.extend(lpae) ac_file = f"{mdl_name}_local_pairwise_qa.cif" qa_file = modelcif.associated.LocalPairwiseQAScoresFile( ac_file, categories=["_ma_qa_metric_local_pairwise"], copy_categories=["_ma_qa_metric"], entry_id=entry_id, entry_details="This file is an associated file consisting " + "of local pairwise QA metrics. This is a partial mmCIF " + "file and can be validated by merging with the main " + "mmCIF file containing the model coordinates and other " + "associated data.", details="Predicted aligned error", ) return modelcif.associated.Repository( "", [ modelcif.associated.ZipFile(f"{mdl_name}.zip", files=[qa_file]) ], ) # NOTE: by convention MA expects zip file with same name as model-cif def _abort_msg(msg, exit_code=1): """Write error message and exit with exit_code.""" print(f"{msg}\nAborting.", file=sys.stderr) sys.exit(exit_code) def _check_file(file_path): """Make sure a file exists and is actually a file.""" if not os.path.exists(file_path): _abort_msg(f"File not found: '{file_path}'.") if not os.path.isfile(file_path): _abort_msg(f"File path does not point to file: '{file_path}'.") def _check_model_extra_files_present(model_dir, pdb_file): """Check that all files needed to process this model are present.""" filestem = os.path.splitext(pdb_file)[0] prfx = os.path.join(model_dir, filestem) scrs = f"{prfx}_scores.json" _check_file(scrs) return prfx, filestem def _get_audit_authors(): """Return the list of authors that produced this model.""" return ( "Papadopoulos, Nikolaos", "Ruperti, Fabian", "Musser, Jacob", "Mirdita, Milot", "Steinegger, Martin", "Arendt, Detlev" ) def _get_metadata(metadata_file): """Read csv file with metedata and prepare for next steps.""" metadata = pd.read_csv(metadata_file) # make sure titles are unique assert len(set(metadata.title)) == metadata.shape[0] # expected title format c.._.._.. return metadata.drop(columns=metadata.columns[0]) \ .set_index("title") def _get_sequences(fasta_file, metadata): """Read fasta file with all sequences and check/compare with metadata.""" seqs = io.LoadSequenceList(fasta_file) # expected format of sequence names: [TITLE]_... seq_dict = {'_'.join(s.name.split('_')[:3]): s.string for s in seqs} only_in_metadata = set(metadata.index) - set(seq_dict) if only_in_metadata: print("PROTEINS ONLY in metadata:", sorted(only_in_metadata)) only_in_seqs = set(seq_dict) - set(metadata.index) if only_in_seqs: print("PROTEINS ONLY in fasta_file:", sorted(only_in_seqs)) return seq_dict def _parse_colabfold_config(cnfg_file): """Read config.json and fetch relevant data from it.""" # NOTE: following code from https://github.com/sokrypton/ColabFold/blob/main/colabfold/batch.py to understand config # fetch and drop fields which are not relevant for model building with open(cnfg_file, encoding="utf8") as jfh: cf_config = json.load(jfh) if "num_queries" in cf_config: del cf_config["num_queries"] # fetch relevant data # -> MSA mode if cf_config["msa_mode"] == "MMseqs2 (UniRef+Environmental)": seq_dbs = ["UniRef", "Environmental"] use_mmseqs = True use_msa = True elif cf_config["msa_mode"] == "MMseqs2 (UniRef only)": seq_dbs = ["UniRef"] use_mmseqs = True use_msa = True elif cf_config["msa_mode"] == "single_sequence": seq_dbs = [] use_mmseqs = False use_msa = False elif cf_config["msa_mode"] == "custom": print("WARNING: Custom MSA mode used. Not clear from config what to do here!") seq_dbs = [] use_mmseqs = False use_msa = True else: raise ValueError(f"Unknown msa_mode {cf_config['msa_mode']}") # -> model type if cf_config["model_type"] == "AlphaFold2-multimer-v1": # AF-Multimer as introduced in AlphaFold v2.1.0 use_multimer = True multimer_version = 1 elif cf_config["model_type"] == "AlphaFold2-multimer-v2": # AF-Multimer as introduced in AlphaFold v2.2.0 use_multimer = True multimer_version = 2 elif cf_config["model_type"] == "AlphaFold2-ptm": use_multimer = False multimer_version = None else: raise ValueError(f"Unknown model_type {cf_config['model_type']}") # write description description = f"Model generated using ColabFold v{cf_config['version']}" if use_multimer: description += f" with AlphaFold-Multimer (v{multimer_version})" else: description += f" with AlphaFold" description += f" producing {cf_config['num_models']} models" \ f" with {cf_config['num_recycles']} recycles each" if cf_config["use_amber"]: description += ", with AMBER relaxation" else: description += ", without model relaxation" if cf_config["use_templates"]: print("WARNING: ColabFold may use PDB70 or custom templates. " \ "Not clear from config!") description += ", using templates" else: description += ", without templates" if cf_config["rank_by"] == "plddt": description += ", ranked by pLDDT" elif cf_config["rank_by"] == "ptmscore": description += ", ranked by pTM" elif cf_config["rank_by"] == "multimer": description += ", ranked by ipTM*0.8+pTM*0.2" else: raise ValueError(f"Unknown rank_by {cf_config['rank_by']}") if use_msa: description += ", starting from" if use_mmseqs: msa_type = "MSA" else: msa_type = "custom MSA" if use_multimer: if cf_config["pair_mode"] == "unpaired+paired": description += f" paired and unpaired {msa_type}s" elif cf_config["pair_mode"] == "paired": description += f" paired {msa_type}s" elif cf_config["pair_mode"] == "unpaired": description += f" unpaired {msa_type}s" else: raise ValueError(f"Unknown pair_mode {cf_config['pair_mode']}") else: description += f" an {msa_type}" if use_mmseqs: description += f" from MMseqs2 ({'+'.join(seq_dbs)})" else: description += " without an MSA" description += "." return { "config": cf_config, "seq_dbs": seq_dbs, "use_mmseqs": use_mmseqs, "use_msa": use_msa, "use_multimer": use_multimer, "multimer_version": multimer_version, "description": description } def _get_protocol_steps_and_software(config_data): """Create the list of protocol steps with software and parameters used.""" protocol = [] # modelling step step = { "method_type": "modeling", "name": None, "details": config_data["description"], } # get input data # Must refer to data already in the JSON, so we try keywords step["input"] = "target_sequences" # get output data # Must refer to existing data, so we try keywords step["output"] = "model" # get software step["software"] = [ { "name": "ColabFold", "classification": "model building", "description": "Structure prediction", "citation": ihm.citations.colabfold, "location": "https://github.com/sokrypton/ColabFold", "type": "package", "version": "1.2.0", }] if config_data["use_mmseqs"]: step["software"].append({ "name": "MMseqs2", "classification": "data collection", "description": "Many-against-Many sequence searching", "citation": ihm.Citation( pmid="30615063", title="MMseqs2 desktop and local web server app for fast, " + "interactive sequence searches.", journal="Bioinformatics", volume=35, page_range=(2856, 2858), year=2019, authors=[ "Mirdita, M.", "Steinegger, M.", "Soeding, J.", ], doi="10.1093/bioinformatics/bty1057", ), "location": "https://github.com/soedinglab/mmseqs2", "type": "package", "version": None, }) if config_data["use_multimer"]: step["software"].append({ "name": "AlphaFold-Multimer", "classification": "model building", "description": "Structure prediction", "citation": ihm.Citation( pmid=None, title="Protein complex prediction with " + "AlphaFold-Multimer.", journal="bioRxiv", volume=None, page_range=None, year=2021, authors=[ "Evans, R.", "O'Neill, M.", "Pritzel, A.", "Antropova, N.", "Senior, A.", "Green, T.", "Zidek, A.", "Bates, R.", "Blackwell, S.", "Yim, J.", "Ronneberger, O.", "Bodenstein, S.", "Zielinski, M.", "Bridgland, A.", "Potapenko, A.", "Cowie, A.", "Tunyasuvunakool, K.", "Jain, R.", "Clancy, E.", "Kohli, P.", "Jumper, J.", "Hassabis, D.", ], doi="10.1101/2021.10.04.463034", ), "location": "https://github.com/deepmind/alphafold", "type": "package", "version": None, }) else: step["software"].append({ "name": "AlphaFold", "classification": "model building", "description": "Structure prediction", "citation": ihm.citations.alphafold2, "location": "https://github.com/deepmind/alphafold", "type": "package", "version": None, }) step["software_parameters"] = config_data["config"] protocol.append(step) return protocol def _get_title(mdl_title): """Get a title for this modelling experiment.""" return f"CoFFE model and functional annotation for {mdl_title}" def _get_model_details(mdl_desc, mdl_annos): """Get the model description.""" return f"{mdl_desc}\n\n{mdl_annos}" def _get_model_group_name(): """Get a name for a model group.""" return None def _get_sequence(chn): """Get the sequence out of an OST chain.""" # initialise lst_rn = chn.residues[0].number.num idx = 1 sqe = chn.residues[0].one_letter_code if lst_rn != 1: sqe = "-" idx = 0 for res in chn.residues[idx:]: lst_rn += 1 while lst_rn != res.number.num: sqe += "-" lst_rn += 1 sqe += res.one_letter_code return sqe def _check_sequence(up_ac, sequence): """Verify sequence to only contain standard olc.""" for res in sequence: if res not in "ACDEFGHIKLMNPQRSTVWY": raise RuntimeError( "Non-standard aa found in UniProtKB sequence " + f"for entry '{up_ac}': {res}" ) def _fetch_upkb_entry(up_ac): """Fetch data for an UniProtKB entry.""" # This is a simple parser for UniProtKB txt format, instead of breaking it up # into multiple functions, we just allow many many branches & statements, # here. # pylint: disable=too-many-branches,too-many-statements data = {} data["up_organism"] = "" data["up_sequence"] = "" data["up_ac"] = up_ac rspns = requests.get(f"https://www.uniprot.org/uniprot/{up_ac}.txt") for line in rspns.iter_lines(decode_unicode=True): if line.startswith("ID "): sline = line.split() if len(sline) != 5: _abort_msg(f"Unusual UniProtKB ID line found:\n'{line}'") data["up_id"] = sline[1] elif line.startswith("OX NCBI_TaxID="): # Following strictly the UniProtKB format: 'OX NCBI_TaxID=<ID>;' data["up_ncbi_taxid"] = line[len("OX NCBI_TaxID=") : -1] data["up_ncbi_taxid"] = data["up_ncbi_taxid"].split("{")[0].strip() elif line.startswith("OS "): if line[-1] == ".": data["up_organism"] += line[len("OS ") : -1] else: data["up_organism"] += line[len("OS ") : -1] + " " elif line.startswith("SQ "): sline = line.split() if len(sline) != 8: _abort_msg(f"Unusual UniProtKB SQ line found:\n'{line}'") data["up_seqlen"] = int(sline[2]) data["up_crc64"] = sline[6] elif line.startswith(" "): sline = line.split() if len(sline) > 6: _abort_msg( "Unusual UniProtKB sequence data line " + f"found:\n'{line}'" ) data["up_sequence"] += "".join(sline) elif line.startswith("RP "): if "ISOFORM" in line.upper(): RuntimeError( f"First ISOFORM found for '{up_ac}', needs " + "handling." ) elif line.startswith("DT "): # 2012-10-03 dt_flds = line[len("DT ") :].split(", ") if dt_flds[1].upper().startswith("SEQUENCE VERSION "): data["up_last_mod"] = datetime.datetime.strptime( dt_flds[0], "%d-%b-%Y" ) elif line.startswith("GN Name="): data["up_gn"] = line[len("GN Name=") :].split(";")[0] data["up_gn"] = data["up_gn"].split("{")[0].strip() # we have not seen isoforms in the data set, yet, so we just set them to '.' data["up_isoform"] = None if "up_gn" not in data: _abort_msg(f"No gene name found for UniProtKB entry '{up_ac}'.") if "up_last_mod" not in data: _abort_msg(f"No sequence version found for UniProtKB entry '{up_ac}'.") if "up_crc64" not in data: _abort_msg(f"No CRC64 value found for UniProtKB entry '{up_ac}'.") if len(data["up_sequence"]) == 0: _abort_msg(f"No sequence found for UniProtKB entry '{up_ac}'.") # check that sequence length and CRC64 is correct if data["up_seqlen"] != len(data["up_sequence"]): _abort_msg( "Sequence length of SQ line and sequence data differ for " + f"UniProtKB entry '{up_ac}': {data['up_seqlen']} != " + f"{len(data['up_sequence'])}" ) _check_sequence(data["up_ac"], data["up_sequence"]) if "up_id" not in data: _abort_msg(f"No ID found for UniProtKB entry '{up_ac}'.") if "up_ncbi_taxid" not in data: _abort_msg(f"No NCBI taxonomy ID found for UniProtKB entry '{up_ac}'.") if len(data["up_organism"]) == 0: _abort_msg(f"No organism species found for UniProtKB entry '{up_ac}'.") return data def _get_upkb_for_sequence(sqe, up_ac): """Get UniProtKB entry data for given sequence.""" up_data = _fetch_upkb_entry(up_ac) if sqe != up_data["up_sequence"]: raise RuntimeError( f"Sequences not equal from file: {sqe}, from UniProtKB: " + f"{up_data['up_sequence']}" ) return up_data def _get_entities(pdb_file, mdl_title, seq_dict): """Gather data for the mmCIF (target) entities.""" ost_ent = io.LoadPDB(pdb_file) # sanity checks if ost_ent.chain_count != 1: raise RuntimeError( f"Unexpected oligomer for {mdl_title}" ) chn = ost_ent.chains[0] sqe = _get_sequence(chn) if sqe != seq_dict[mdl_title]: raise RuntimeError( f"Sequence for {mdl_title} not as expected; from file: {sqe}" \ f", from seq. list: {seq_dict[mdl_title]}" ) # fill entities entities = [{ "pdb_sequence": sqe, "pdb_chain_id": chn.name, "description": f"Spongilla lacustris {mdl_title} protein" }] return entities, ost_ent def _get_scores(data, prfx): """Check that all files needed to process this model are present.""" scrs_fle = f"{prfx}_scores.json" with open(scrs_fle, encoding="utf8") as jfh: scrs_json = json.load(jfh) # NOTE for reuse of data when iterating multiple models: this will overwrite # scores in data but will not delete any scores if prev. models had more... data.update(scrs_json) def _get_modelcif_entities(target_ents, source, asym_units, system): """Create ModelCIF entities and asymmetric units.""" for cif_ent in target_ents: mdlcif_ent = modelcif.Entity( cif_ent["pdb_sequence"], description=cif_ent["description"], source=source ) asym_units[cif_ent["pdb_chain_id"]] = modelcif.AsymUnit( mdlcif_ent ) system.target_entities.append(mdlcif_ent) def _assemble_modelcif_software(soft_dict): """Create a modelcif.Software instance from dictionary.""" return modelcif.Software( soft_dict["name"], soft_dict["classification"], soft_dict["description"], soft_dict["location"], soft_dict["type"], soft_dict["version"], citation=soft_dict["citation"] ) def _get_sequence_dbs(seq_dbs): """Get ColabFold seq. DBs.""" # NOTE: hard coded for ColabFold versions before 2022/07/13 # -> afterwards UniRef30 updated to 2022_02 (and maybe more changes) db_dict = { "UniRef": modelcif.ReferenceDatabase( "UniRef30", "http://wwwuser.gwdg.de/~compbiol/colabfold/uniref30_2103.tar.gz", version="2021_03" ), "Environmental": modelcif.ReferenceDatabase( "ColabFold DB", "http://wwwuser.gwdg.de/~compbiol/colabfold/colabfold_envdb_202108.tar.gz", version="2021_08" ) } return [db_dict[seq_db] for seq_db in seq_dbs] def _get_modelcif_protocol(protocol_steps, target_entities, model, ref_dbs): """Create the protocol for the ModelCIF file.""" protocol = modelcif.protocol.Protocol() for js_step in protocol_steps: sftwre = None if js_step["software"]: if len(js_step["software"]) == 1: sftwre = _assemble_modelcif_software(js_step["software"][0]) else: sftwre = [] for sft in js_step["software"]: sftwre.append(_assemble_modelcif_software(sft)) sftwre = modelcif.SoftwareGroup(elements=sftwre) if js_step["software_parameters"]: params = [] for k, v in js_step["software_parameters"].items(): params.append( modelcif.SoftwareParameter(k, v) ) if isinstance(sftwre, modelcif.SoftwareGroup): sftwre.parameters = params else: sftwre = modelcif.SoftwareGroup( elements=(sftwre,), parameters=params ) if js_step["input"] == "target_sequences": input_data = modelcif.data.DataGroup(target_entities) input_data.extend(ref_dbs) elif js_step["input"] == "model": input_data = model else: raise RuntimeError(f"Unknown protocol input: '{js_step['input']}'") if js_step["output"] == "model": output_data = model else: raise RuntimeError( f"Unknown protocol output: '{js_step['output']}'" ) protocol.steps.append( modelcif.protocol.Step( input_data=input_data, output_data=output_data, name=js_step["name"], details=js_step["details"], software=sftwre, ) ) protocol.steps[-1].method_type = js_step["method_type"] return protocol def _compress_cif_file(cif_file): """Compress cif file and delete original.""" with open(cif_file, 'rb') as f_in: with gzip.open(cif_file + '.gz', 'wb') as f_out: shutil.copyfileobj(f_in, f_out) os.remove(cif_file) def _package_associated_files(mdl_name): """Compress associated files into single zip file and delete original.""" # file names must match ones from add_scores zip_path = f"{mdl_name}.zip" files = [f"{mdl_name}_local_pairwise_qa.cif"] # zip settings tested for good speed vs compression with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_BZIP2) as myzip: for file in files: myzip.write(file) os.remove(file) def _store_as_modelcif(data_json, ost_ent, out_dir, mdl_name, compress): """Mix all the data into a ModelCIF file.""" print(" generating ModelCIF objects...", end="") pstart = timer() # create system to gather all the data system = modelcif.System( title=data_json["title"], id=data_json["mdl_title"].upper(), model_details=data_json["model_details"], ) # create target entities, references, source, asymmetric units & assembly # for source we assume all chains come from the same taxon source = ihm.source.Natural( ncbi_taxonomy_id=data_json["tax_info"]["up_ncbi_taxid"], scientific_name=data_json["tax_info"]["up_organism"], ) # create an asymmetric unit and an entity per target sequence asym_units = {} _get_modelcif_entities( data_json["target_entities"], source, asym_units, system ) assembly = modelcif.Assembly( asym_units.values() ) # audit_authors system.authors.extend(data_json["audit_authors"]) # set up the model to produce coordinates model = _OST2ModelCIF( assembly=assembly, asym=asym_units, ost_entity=ost_ent, name=f"Model {data_json['mdl_num']} (top ranked model)", ) print(f" ({timer()-pstart:.2f}s)") print(" processing QA scores...", end="", flush=True) pstart = timer() system.repositories.append( model.add_scores(data_json, system.id, mdl_name) ) print(f" ({timer()-pstart:.2f}s)") model_group = modelcif.model.ModelGroup( [model], name=data_json["model_group_name"] ) system.model_groups.append(model_group) ref_dbs = _get_sequence_dbs(data_json["config_data"]["seq_dbs"]) protocol = _get_modelcif_protocol( data_json["protocol"], system.target_entities, model, ref_dbs ) system.protocols.append(protocol) # write modelcif System to file print(" write to disk...", end="", flush=True) pstart = timer() # NOTE: this will dump PAE on path provided in add_scores # -> hence we cheat by changing path and back while being exception-safe... oldpwd = os.getcwd() os.chdir(out_dir) try: with open(f"{mdl_name}.cif", "w", encoding="ascii") as mmcif_fh: modelcif.dumper.write(mmcif_fh, [system]) _package_associated_files(mdl_name) if compress: _compress_cif_file(f"{mdl_name}.cif") finally: os.chdir(oldpwd) print(f" ({timer()-pstart:.2f}s)") def _create_json(config_data): """Create a dictionary (mimicking JSON) that contains data which is the same for all models.""" data = {} data["audit_authors"] = _get_audit_authors() data["protocol"] = _get_protocol_steps_and_software(config_data) data["config_data"] = config_data return data def _create_model_json(data, pdb_file, seq_dict, metadata): """Create a dictionary (mimicking JSON) that contains all the data.""" data["target_entities"], ost_ent = _get_entities(pdb_file, data["mdl_title"], seq_dict) data["title"] = _get_title(data["mdl_title"]) mdl_desc = data["config_data"]["description"] mdl_annos = metadata.loc[data["mdl_title"], "description"] data["model_details"] = _get_model_details(mdl_desc, mdl_annos) data["model_group_name"] = _get_model_group_name() return ost_ent def _main(): """Run as script.""" opts = _parse_args() # parse/fetch global data config_data = _parse_colabfold_config(opts.config_file) metadata = _get_metadata(opts.metadata_file) seq_dict = _get_sequences(opts.fasta_file, metadata) if opts.compress: cifext = "cif.gz" else: cifext = "cif" # fetch a single randomly chosen UniProt entry for tax info upkb_data = _fetch_upkb_entry("P42690") tax_info = {k: v for k, v in upkb_data.items() \ if k in ["up_organism", "up_ncbi_taxid"]} # get on with models print(f"Working on {opts.model_dir}...") # iterate model directory for fle in sorted(os.listdir(opts.model_dir)): # iterate PDB files if not fle.endswith(".pdb"): continue # check file and if done already file_prfx, mdl_name = _check_model_extra_files_present(opts.model_dir, fle) fle = os.path.join(opts.model_dir, fle) if os.path.exists(os.path.join(opts.out_dir, f"{mdl_name}.{cifext}")): print(f" {mdl_name} already done...") continue print(f" translating {mdl_name}...") pdb_start = timer() # NOTE: could also be prepared globally if all carefully overwritten # but not worth the trouble... mdlcf_json = _create_json(config_data) mdlcf_json["tax_info"] = tax_info # # mdl_name = [TITLE]_unrelaxed_rank_X_model_Y.pdb mdl_name_parts = mdl_name.split('_') assert len(mdl_name_parts) == 8 assert int(mdl_name_parts[5]) == 1 # rank 1 only mdlcf_json["mdl_num"] = int(mdl_name_parts[7]) mdlcf_json["mdl_title"] = '_'.join(mdl_name_parts[:3]) # gather data into JSON-like structure print(" preparing data...", end="") pstart = timer() ost_ent = _create_model_json(mdlcf_json, fle, seq_dict, metadata) # read quality scores from JSON file _get_scores(mdlcf_json, file_prfx) print(f" ({timer()-pstart:.2f}s)") _store_as_modelcif(mdlcf_json, ost_ent, opts.out_dir, mdl_name, opts.compress) print(f" ... done with {mdl_name} ({timer()-pdb_start:.2f}s).") # check if result can be read and has expected seq. ent = io.LoadMMCIF(os.path.join(opts.out_dir, f"{mdl_name}.{cifext}")) assert ent.chain_count == 1, f"Bad chain count {mdl_name}" ent_seq = "".join(res.one_letter_code for res in ent.residues) assert ent_seq == seq_dict[mdlcf_json["mdl_title"]], \ f"Bad seq. {mdl_name}" print(f"... done with {opts.model_dir}.") if __name__ == "__main__": _main() # LocalWords: Xabi argparse args ArgumentParser formatter dir str metavar os # LocalWords: RawDescriptionHelpFormatter ACs sys msg nAborting stderr acs # LocalWords: fle cnfg config json pdb prfx scrs endswith modelcif uid pdbx # LocalWords: struct cif utf mmcif fh datetime ost io ToDo chn lst rn idx aa # LocalWords: sqe olc ACDEFGHIKLMNPQRSTVWY RuntimeError upkb txt pylint iter # LocalWords: rspns unicode startswith sline len elif NCBI TaxID ncbi taxid # LocalWords: seqlen crc ISOFORM DT dt flds isoforms isoform ent LoadPDB ihm # LocalWords: mdlcf mdlcif asym AsymUnit init kwargs atm pos het hetatom pTM # LocalWords: biso ujson GlobalPTM pLDDT ptm jfh numpy np GlobalPLDDT lDDT # LocalWords: plddt LocalPLDDT timeit PAE MetricType LocalPairwisePAE lpae # LocalWords: nd pae qa pstart ColabFold