diff --git a/translate2modelcif.py b/translate2modelcif.py index 20d7c81f8c149d5224a7c8f197109d5233035ba2..1411f0ff18bedb4a6ec14e97719c1dbe98d94f52 100644 --- a/translate2modelcif.py +++ b/translate2modelcif.py @@ -1,6 +1,17 @@ """Translate models from Tara/ Xabi from PDB + extra data into ModelCIF.""" import argparse +import datetime +import os +import sys + +import requests + +import modelcif +import modelcif.dumper +import modelcif.reference + +from ost import io def _parse_args(): @@ -10,13 +21,303 @@ def _parse_args(): description=__doc__, ) - return parser.parse_args() + parser.add_argument( + "model_dir", + type=str, + metavar="<MODEL DIR>", + help="Directory with model(s) to be translated. Must be of form " + + "'<UniProtKB AC>-<UniProtKB AC>'", + ) + + opts = parser.parse_args() + + # check that model dir exists + if not os.path.exists(opts.model_dir): + _abort_msg(f"Model directory '{opts.model_dir}' does not exist.") + if not os.path.isdir(opts.model_dir): + _abort_msg(f"Path '{opts.model_dir}' does not point to a directory.") + + return opts + + +def _abort_msg(msg, exit_code=1): + """Write error message and exit with exit_code.""" + print(f"{msg}\nAborting.", file=sys.stderr) + sys.exit(exit_code) + + +def _check_file(file_path): + """Make sure a file exists and is actually a file.""" + if not os.path.exists(file_path): + _abort_msg(f"File not found: '{file_path}'.") + if not os.path.isfile(file_path): + _abort_msg(f"File path does not point to file: '{file_path}'.") + + +def _check_interaction_extra_files_present(model_dir): + """Make sure some general files are present for an interaction.""" + cnfg = os.path.join(model_dir, "config.json") + _check_file(cnfg) + + +def _check_model_extra_files_present(model_dir, pdb_file): + """Check that all files needed to process this model are present.""" + uid = os.path.splitext(pdb_file)[0] + prfx = os.path.join(model_dir, uid) + + scrs = f"{prfx}_scores.json" + _check_file(scrs) + + return prfx, uid + + +def _get_audit_authors(): + """Return the list of authors that produced this model.""" + # ToDo [input]: get the proper list of authors of the model + return ("Foo B", "Bar F") + + +def _get_title(): + """Get a title for this modelling experiment.""" + # ToDo [input]: Add title + return "struct.title" + + +def _get_model_details(): + """Get the model description.""" + # ToDo [input]: Add model_details + return "struct.pdbx_model_details" + + +def _get_sequence(chn): + """Get the sequence out of an OST chain.""" + # initialise + lst_rn = chn.residues[0].number.num + idx = 1 + sqe = chn.residues[0].one_letter_code + if lst_rn != 1: + sqe = "-" + idx = 0 + + for res in chn.residues[idx:]: + lst_rn += 1 + while lst_rn != res.number.num: + sqe += "-" + lst_rn += 1 + sqe += res.one_letter_code + + return sqe + + +def _check_sequence(up_ac, sequence): + """Verify sequence to only contain standard olc.""" + for res in sequence: + if res not in "ACDEFGHIKLMNPQRSTVWY": + raise RuntimeError( + "Non-standard aa found in UniProtKB sequence " + + f"for entry '{up_ac}': {res}" + ) + + +def _fetch_upkb_entry(up_ac): + """Fetch data for an UniProtKB entry.""" + # This is a simple parser for UniProtKB txt format, instead of breaking it up + # into multiple functions, we just allow many many branches & statements, + # here. + # pylint: disable=too-many-branches,too-many-statements + data = {} + data["up_organism"] = "" + data["up_sequence"] = "" + data["up_ac"] = up_ac + rspns = requests.get(f"https://www.uniprot.org/uniprot/{up_ac}.txt") + for line in rspns.iter_lines(decode_unicode=True): + if line.startswith("ID "): + sline = line.split() + if len(sline) != 5: + _abort_msg(f"Unusual UniProtKB ID line found:\n'{line}'") + data["up_id"] = sline[1] + elif line.startswith("OX NCBI_TaxID="): + # Following strictly the UniProtKB format: 'OX NCBI_TaxID=<ID>;' + data["up_ncbi_taxid"] = line[len("OX NCBI_TaxID=") : -1] + data["up_ncbi_taxid"] = data["up_ncbi_taxid"].split("{")[0].strip() + elif line.startswith("OS "): + if line[-1] == ".": + data["up_organism"] += line[len("OS ") : -1] + else: + data["up_organism"] += line[len("OS ") : -1] + " " + elif line.startswith("SQ "): + sline = line.split() + if len(sline) != 8: + _abort_msg(f"Unusual UniProtKB SQ line found:\n'{line}'") + data["up_seqlen"] = int(sline[2]) + data["up_crc64"] = sline[6] + elif line.startswith(" "): + sline = line.split() + if len(sline) > 6: + _abort_msg( + "Unusual UniProtKB sequence data line " + + f"found:\n'{line}'" + ) + data["up_sequence"] += "".join(sline) + elif line.startswith("RP "): + if "ISOFORM" in line.upper(): + RuntimeError( + f"First ISOFORM found for '{up_ac}', needs " + "handling." + ) + elif line.startswith("DT "): + # 2012-10-03 + dt_flds = line[len("DT ") :].split(", ") + if dt_flds[1].upper().startswith("SEQUENCE VERSION "): + data["up_last_mod"] = datetime.datetime.strptime( + dt_flds[0], "%d-%b-%Y" + ) + + # we have not seen isoforms in the data set, yet, so we just set them to '.' + data["up_isoform"] = None + + if "up_last_mod" not in data: + _abort_msg(f"No sequence version found for UniProtKB entry '{up_ac}'.") + if "up_crc64" not in data: + _abort_msg(f"No CRC64 value found for UniProtKB entry '{up_ac}'.") + if len(data["up_sequence"]) == 0: + _abort_msg(f"No sequence found for UniProtKB entry '{up_ac}'.") + # check that sequence length and CRC64 is correct + if data["up_seqlen"] != len(data["up_sequence"]): + _abort_msg( + "Sequence length of SQ line and sequence data differ for " + + f"UniProtKB entry '{up_ac}': {data['up_seqlen']} != " + + f"{len(data['up_sequence'])}" + ) + _check_sequence(data["up_ac"], data["up_sequence"]) + + if "up_id" not in data: + _abort_msg(f"No ID found for UniProtKB entry '{up_ac}'.") + if "up_ncbi_taxid" not in data: + _abort_msg(f"No NCBI taxonomy ID found for UniProtKB entry '{up_ac}'.") + if len(data["up_organism"]) == 0: + _abort_msg(f"No organism species found for UniProtKB entry '{up_ac}'.") + + return data + + +def _get_upkb_for_sequence(sqe, up_ac): + """Get UniProtKB entry data for given sequence.""" + up_data = _fetch_upkb_entry(up_ac) + if sqe != up_data["up_sequence"]: + raise RuntimeError( + f"Sequences not equal from file: {sqe}, from UniProtKB: " + + f"{up_data['up_sequence']}" + ) + + return up_data + + +def _get_entities(pdb_file, up_acs): + """Gather data for the mmCIF (target) entities.""" + entities = [] + + ost_ent = io.LoadPDB(pdb_file) + for i, chn in enumerate(ost_ent.chains): + cif_ent = {} + sqe = _get_sequence(chn) + upkb = _get_upkb_for_sequence(sqe, up_acs[i]) + cif_ent["cif_sequence"] = sqe + cif_ent.update(upkb) + entities.append(cif_ent) + + return entities + + +def _store_as_modelcif(interaction_name, data_json, file_prfx): + """Mix all the data into a ModelCIF file.""" + # create target references, ... + for cif_ent in data_json["target_entities"]: + # 'cif_sequence' + ref = modelcif.reference.UniProt( + cif_ent["up_id"], + cif_ent["up_ac"], + # careful: alignments are just that easy because of equal sequences! + align_begin=1, + align_end=cif_ent["up_seqlen"], + isoform=cif_ent["up_isoform"], + ncbi_taxonomy_id=cif_ent["up_ncbi_taxid"], + organism_scientific=cif_ent["up_organism"], + sequence_version_date=cif_ent["up_last_mod"], + sequence_crc64=cif_ent["up_crc64"], + ) + + # create system to gather all the data + system = modelcif.System( + title=data_json["title"], + id=interaction_name.upper(), + model_details=data_json["model_details"], + ) + # audit_authors + system.authors.extend(data_json["audit_authors"]) + + # write modelcif System to file + with open(f"{file_prfx}.cif", "w", encoding="utf8") as mmcif_fh: + modelcif.dumper.write(mmcif_fh, [system]) + + +def _create_interaction_json(): + """Create a dictionary (mimicking JSON) that contains data which is the same + for all models.""" + data = {} + + data["audit_authors"] = _get_audit_authors() + data["model_details"] = _get_model_details() + data["title"] = _get_title() + + return data + + +def _create_model_json(data, pdb_file, up_acs): + """Create a dictionary (mimicking JSON) that contains all the data.""" + + data["target_entities"] = _get_entities(pdb_file, up_acs) + return data def _main(): """Run as script.""" - _parse_args() + opts = _parse_args() + + interaction = os.path.split(opts.model_dir)[-1] + print(f"Working on {interaction}...") + + # get UniProtKB ACs from directory name + up_acs = interaction.split("-") + + _check_interaction_extra_files_present(opts.model_dir) + + mdlcf_json = _create_interaction_json() + + # iterate model directory + for fle in os.listdir(opts.model_dir): + # iterate PDB files + if not fle.endswith(".pdb"): + continue + file_prfx, uid = _check_model_extra_files_present(opts.model_dir, fle) + fle = os.path.join(opts.model_dir, fle) + + # gather data into JSON-like structure + _create_model_json(mdlcf_json, fle, up_acs) + + _store_as_modelcif(uid, mdlcf_json, file_prfx) + # ToDo [internal]: wipe data or is it overwritten in mdlcf_json? + + print(f"... done with {opts.model_dir}.") if __name__ == "__main__": _main() + +# LocalWords: Xabi argparse args ArgumentParser formatter dir str metavar os +# LocalWords: RawDescriptionHelpFormatter ACs sys msg nAborting stderr acs +# LocalWords: fle cnfg config json pdb prfx scrs endswith modelcif uid pdbx +# LocalWords: struct cif utf mmcif fh datetime ost io ToDo chn lst rn idx aa +# LocalWords: sqe olc ACDEFGHIKLMNPQRSTVWY RuntimeError upkb txt pylint iter +# LocalWords: rspns unicode startswith sline len elif NCBI TaxID ncbi taxid +# LocalWords: seqlen crc ISOFORM DT dt flds isoforms isoform ent LoadPDB +# LocalWords: mdlcf