Skip to content
Snippets Groups Projects
Commit bc1eae2a authored by Bienchen's avatar Bienchen
Browse files

Create target references

parent 59834135
No related branches found
No related tags found
No related merge requests found
"""Translate models from Tara/ Xabi from PDB + extra data into ModelCIF."""
import argparse
import datetime
import os
import sys
import requests
import modelcif
import modelcif.dumper
import modelcif.reference
from ost import io
def _parse_args():
......@@ -10,13 +21,303 @@ def _parse_args():
description=__doc__,
)
return parser.parse_args()
parser.add_argument(
"model_dir",
type=str,
metavar="<MODEL DIR>",
help="Directory with model(s) to be translated. Must be of form "
+ "'<UniProtKB AC>-<UniProtKB AC>'",
)
opts = parser.parse_args()
# check that model dir exists
if not os.path.exists(opts.model_dir):
_abort_msg(f"Model directory '{opts.model_dir}' does not exist.")
if not os.path.isdir(opts.model_dir):
_abort_msg(f"Path '{opts.model_dir}' does not point to a directory.")
return opts
def _abort_msg(msg, exit_code=1):
"""Write error message and exit with exit_code."""
print(f"{msg}\nAborting.", file=sys.stderr)
sys.exit(exit_code)
def _check_file(file_path):
"""Make sure a file exists and is actually a file."""
if not os.path.exists(file_path):
_abort_msg(f"File not found: '{file_path}'.")
if not os.path.isfile(file_path):
_abort_msg(f"File path does not point to file: '{file_path}'.")
def _check_interaction_extra_files_present(model_dir):
"""Make sure some general files are present for an interaction."""
cnfg = os.path.join(model_dir, "config.json")
_check_file(cnfg)
def _check_model_extra_files_present(model_dir, pdb_file):
"""Check that all files needed to process this model are present."""
uid = os.path.splitext(pdb_file)[0]
prfx = os.path.join(model_dir, uid)
scrs = f"{prfx}_scores.json"
_check_file(scrs)
return prfx, uid
def _get_audit_authors():
"""Return the list of authors that produced this model."""
# ToDo [input]: get the proper list of authors of the model
return ("Foo B", "Bar F")
def _get_title():
"""Get a title for this modelling experiment."""
# ToDo [input]: Add title
return "struct.title"
def _get_model_details():
"""Get the model description."""
# ToDo [input]: Add model_details
return "struct.pdbx_model_details"
def _get_sequence(chn):
"""Get the sequence out of an OST chain."""
# initialise
lst_rn = chn.residues[0].number.num
idx = 1
sqe = chn.residues[0].one_letter_code
if lst_rn != 1:
sqe = "-"
idx = 0
for res in chn.residues[idx:]:
lst_rn += 1
while lst_rn != res.number.num:
sqe += "-"
lst_rn += 1
sqe += res.one_letter_code
return sqe
def _check_sequence(up_ac, sequence):
"""Verify sequence to only contain standard olc."""
for res in sequence:
if res not in "ACDEFGHIKLMNPQRSTVWY":
raise RuntimeError(
"Non-standard aa found in UniProtKB sequence "
+ f"for entry '{up_ac}': {res}"
)
def _fetch_upkb_entry(up_ac):
"""Fetch data for an UniProtKB entry."""
# This is a simple parser for UniProtKB txt format, instead of breaking it up
# into multiple functions, we just allow many many branches & statements,
# here.
# pylint: disable=too-many-branches,too-many-statements
data = {}
data["up_organism"] = ""
data["up_sequence"] = ""
data["up_ac"] = up_ac
rspns = requests.get(f"https://www.uniprot.org/uniprot/{up_ac}.txt")
for line in rspns.iter_lines(decode_unicode=True):
if line.startswith("ID "):
sline = line.split()
if len(sline) != 5:
_abort_msg(f"Unusual UniProtKB ID line found:\n'{line}'")
data["up_id"] = sline[1]
elif line.startswith("OX NCBI_TaxID="):
# Following strictly the UniProtKB format: 'OX NCBI_TaxID=<ID>;'
data["up_ncbi_taxid"] = line[len("OX NCBI_TaxID=") : -1]
data["up_ncbi_taxid"] = data["up_ncbi_taxid"].split("{")[0].strip()
elif line.startswith("OS "):
if line[-1] == ".":
data["up_organism"] += line[len("OS ") : -1]
else:
data["up_organism"] += line[len("OS ") : -1] + " "
elif line.startswith("SQ "):
sline = line.split()
if len(sline) != 8:
_abort_msg(f"Unusual UniProtKB SQ line found:\n'{line}'")
data["up_seqlen"] = int(sline[2])
data["up_crc64"] = sline[6]
elif line.startswith(" "):
sline = line.split()
if len(sline) > 6:
_abort_msg(
"Unusual UniProtKB sequence data line "
+ f"found:\n'{line}'"
)
data["up_sequence"] += "".join(sline)
elif line.startswith("RP "):
if "ISOFORM" in line.upper():
RuntimeError(
f"First ISOFORM found for '{up_ac}', needs " + "handling."
)
elif line.startswith("DT "):
# 2012-10-03
dt_flds = line[len("DT ") :].split(", ")
if dt_flds[1].upper().startswith("SEQUENCE VERSION "):
data["up_last_mod"] = datetime.datetime.strptime(
dt_flds[0], "%d-%b-%Y"
)
# we have not seen isoforms in the data set, yet, so we just set them to '.'
data["up_isoform"] = None
if "up_last_mod" not in data:
_abort_msg(f"No sequence version found for UniProtKB entry '{up_ac}'.")
if "up_crc64" not in data:
_abort_msg(f"No CRC64 value found for UniProtKB entry '{up_ac}'.")
if len(data["up_sequence"]) == 0:
_abort_msg(f"No sequence found for UniProtKB entry '{up_ac}'.")
# check that sequence length and CRC64 is correct
if data["up_seqlen"] != len(data["up_sequence"]):
_abort_msg(
"Sequence length of SQ line and sequence data differ for "
+ f"UniProtKB entry '{up_ac}': {data['up_seqlen']} != "
+ f"{len(data['up_sequence'])}"
)
_check_sequence(data["up_ac"], data["up_sequence"])
if "up_id" not in data:
_abort_msg(f"No ID found for UniProtKB entry '{up_ac}'.")
if "up_ncbi_taxid" not in data:
_abort_msg(f"No NCBI taxonomy ID found for UniProtKB entry '{up_ac}'.")
if len(data["up_organism"]) == 0:
_abort_msg(f"No organism species found for UniProtKB entry '{up_ac}'.")
return data
def _get_upkb_for_sequence(sqe, up_ac):
"""Get UniProtKB entry data for given sequence."""
up_data = _fetch_upkb_entry(up_ac)
if sqe != up_data["up_sequence"]:
raise RuntimeError(
f"Sequences not equal from file: {sqe}, from UniProtKB: "
+ f"{up_data['up_sequence']}"
)
return up_data
def _get_entities(pdb_file, up_acs):
"""Gather data for the mmCIF (target) entities."""
entities = []
ost_ent = io.LoadPDB(pdb_file)
for i, chn in enumerate(ost_ent.chains):
cif_ent = {}
sqe = _get_sequence(chn)
upkb = _get_upkb_for_sequence(sqe, up_acs[i])
cif_ent["cif_sequence"] = sqe
cif_ent.update(upkb)
entities.append(cif_ent)
return entities
def _store_as_modelcif(interaction_name, data_json, file_prfx):
"""Mix all the data into a ModelCIF file."""
# create target references, ...
for cif_ent in data_json["target_entities"]:
# 'cif_sequence'
ref = modelcif.reference.UniProt(
cif_ent["up_id"],
cif_ent["up_ac"],
# careful: alignments are just that easy because of equal sequences!
align_begin=1,
align_end=cif_ent["up_seqlen"],
isoform=cif_ent["up_isoform"],
ncbi_taxonomy_id=cif_ent["up_ncbi_taxid"],
organism_scientific=cif_ent["up_organism"],
sequence_version_date=cif_ent["up_last_mod"],
sequence_crc64=cif_ent["up_crc64"],
)
# create system to gather all the data
system = modelcif.System(
title=data_json["title"],
id=interaction_name.upper(),
model_details=data_json["model_details"],
)
# audit_authors
system.authors.extend(data_json["audit_authors"])
# write modelcif System to file
with open(f"{file_prfx}.cif", "w", encoding="utf8") as mmcif_fh:
modelcif.dumper.write(mmcif_fh, [system])
def _create_interaction_json():
"""Create a dictionary (mimicking JSON) that contains data which is the same
for all models."""
data = {}
data["audit_authors"] = _get_audit_authors()
data["model_details"] = _get_model_details()
data["title"] = _get_title()
return data
def _create_model_json(data, pdb_file, up_acs):
"""Create a dictionary (mimicking JSON) that contains all the data."""
data["target_entities"] = _get_entities(pdb_file, up_acs)
return data
def _main():
"""Run as script."""
_parse_args()
opts = _parse_args()
interaction = os.path.split(opts.model_dir)[-1]
print(f"Working on {interaction}...")
# get UniProtKB ACs from directory name
up_acs = interaction.split("-")
_check_interaction_extra_files_present(opts.model_dir)
mdlcf_json = _create_interaction_json()
# iterate model directory
for fle in os.listdir(opts.model_dir):
# iterate PDB files
if not fle.endswith(".pdb"):
continue
file_prfx, uid = _check_model_extra_files_present(opts.model_dir, fle)
fle = os.path.join(opts.model_dir, fle)
# gather data into JSON-like structure
_create_model_json(mdlcf_json, fle, up_acs)
_store_as_modelcif(uid, mdlcf_json, file_prfx)
# ToDo [internal]: wipe data or is it overwritten in mdlcf_json?
print(f"... done with {opts.model_dir}.")
if __name__ == "__main__":
_main()
# LocalWords: Xabi argparse args ArgumentParser formatter dir str metavar os
# LocalWords: RawDescriptionHelpFormatter ACs sys msg nAborting stderr acs
# LocalWords: fle cnfg config json pdb prfx scrs endswith modelcif uid pdbx
# LocalWords: struct cif utf mmcif fh datetime ost io ToDo chn lst rn idx aa
# LocalWords: sqe olc ACDEFGHIKLMNPQRSTVWY RuntimeError upkb txt pylint iter
# LocalWords: rspns unicode startswith sline len elif NCBI TaxID ncbi taxid
# LocalWords: seqlen crc ISOFORM DT dt flds isoforms isoform ent LoadPDB
# LocalWords: mdlcf
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment