diff --git a/projects/2024-12-ma-denv/README.md b/projects/2024-12-ma-denv/README.md new file mode 100644 index 0000000000000000000000000000000000000000..d925f5459e3a2997f144b041ec8eab643833e063 --- /dev/null +++ b/projects/2024-12-ma-denv/README.md @@ -0,0 +1,22 @@ +# Modelling AVITHRAID (ToDo: change to real title) + +[Link to project in ModelArchive](https://www.modelarchive.org/doi/10.5452/ma-chfwm) (ToDo: update to final ID as requested by user) (incl. background on project itself and link to the publication) + +Modelling setup: +- ToDo: finish +- SWISS-MODEL web server +- figure out: automodel or interactive + +Input files for conversion: +- SWISS-MODEL ModelCIF files +- ToDo: does the script require to have all files in a directory? Or does the + script run with all files as arguments? + +Special features here: +- ToDo: fill this section + +Content: +- ToDo: fill with files included in this directory - scripts, example data... + +<!-- LocalWords: ToDo AVITHRAID + --> diff --git a/projects/2024-12-ma-denv/example_index.csv.gz b/projects/2024-12-ma-denv/example_index.csv.gz new file mode 100755 index 0000000000000000000000000000000000000000..23d5092b7f3babf8a77f1c6d7fa886c7adaed01c Binary files /dev/null and b/projects/2024-12-ma-denv/example_index.csv.gz differ diff --git a/projects/2024-12-ma-denv/example_result.cif.gz b/projects/2024-12-ma-denv/example_result.cif.gz new file mode 100644 index 0000000000000000000000000000000000000000..1d078534efb68ebe6516ac4f3f61fe0e7a050c6a Binary files /dev/null and b/projects/2024-12-ma-denv/example_result.cif.gz differ diff --git a/projects/2024-12-ma-denv/example_swissmodelcif.cif.gz b/projects/2024-12-ma-denv/example_swissmodelcif.cif.gz new file mode 100755 index 0000000000000000000000000000000000000000..f76354699f4fe37b517a5ffe546a363b1071f9de Binary files /dev/null and b/projects/2024-12-ma-denv/example_swissmodelcif.cif.gz differ diff --git a/projects/2024-12-ma-denv/example_unp_data.json.gz b/projects/2024-12-ma-denv/example_unp_data.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..35af190837c95979968b3e726939782ce26d4d9b Binary files /dev/null and b/projects/2024-12-ma-denv/example_unp_data.json.gz differ diff --git a/projects/2024-12-ma-denv/macif.py b/projects/2024-12-ma-denv/macif.py new file mode 100644 index 0000000000000000000000000000000000000000..da5b9c46cd2e8a773d717bb46fc38a23c6571e14 --- /dev/null +++ b/projects/2024-12-ma-denv/macif.py @@ -0,0 +1,186 @@ +"""mmCIF editing for ModelArchive depositions. + +Supposed to look like gemmi.cif but with some convenience on top. Not meant to +show high-performance but help preparing ModelCIF files for a MA deposition. +Use for 'one time' jobs, not as frequently run tool in a service/ pipeline.""" + +import gzip + +# For whatever reason, 'no-name-in-module' can not be silenced by config atm +# pylint: disable=no-name-in-module +from gemmi import cif + +# pylint: enable=no-name-in-module +import gemmi + + +def _gemmi_quote(value): + """Quote string values when necessary.""" + if ( + isinstance(value, str) + and " " in value + and not (value.startswith("'") and value.endswith("'")) + and not (value.startswith('"') and value.endswith('"')) + ): + return cif.quote(value) + return value + + +class MABlock: + """gemmi.cif wrapper that skips reading/ documents and jumps right into + gemmi.cif.Block objects. You have all the gemmi.cif.Block functionality + available plus our own convenience functions on top.""" + + def __init__(self, model_data): + """Initialise a single Block from a file.""" + self.source = model_data + self.doc = cif.read(model_data) + self.block = self.doc.sole_block() + + self._targets = None + self._polymer_targets = None + + def __getattr__(self, name): + """If an attribute is not found, try self.block before exception.""" + # The catch here is: when asking for self.foo, + # self.__getattribute__(self, "foo") is executed first. If "foo" is + # not found in self, self.__getattr__(self, "foo") is called. So here + # we already know, that "foo" is not there and we can check the + # original block. + if hasattr(self.block, name): + return getattr(self.block, name) + raise AttributeError( + f"'{type(self).__name__}' object has no attribute '{name}'" + ) + + @property + def targets(self): + """Info about targets.""" + if self._targets is not None: + return self._targets + self._targets = {} + table = self.find("_ma_target_entity.", ["entity_id"]) + for row in table: + if row["entity_id"] in self._targets: + raise RuntimeError( + f"Target with entity_id '{row['entity_id']}' is duplicated." + ) + self._targets[row["entity_id"]] = { + "entity_id": row["entity_id"], # makes live easier for singles + "sequence": self.get_sequence(row["entity_id"]), + } + table = self.find("_entity.", ["id", "type"]) + for row in table: + self._targets[row["id"]]["type"] = row["type"] + + return self._targets + + @property + def polymer_targets(self): + """Only targets of entity type 'polymer'.""" + if self._polymer_targets is not None: + return self._polymer_targets + self._polymer_targets = [] + for target in self.targets.values(): + if target["type"] == "polymer": + self._polymer_targets.append(target) + + return self._polymer_targets + + def find(self, name, columns): + """Get a table with defined colums. Throws an exception if table is not + found.""" + table = self.block.find(name, columns) + if len(table) == 0: + raise RuntimeError( + f"""Table '{name}' with columns '{"', '".join(columns)}' """ + + "not found." + ) + + return table + + def get_sequence(self, entity): + """Get the sequence of a 'polymer' entity. `entity` is the numeric ID + of the entity. + Reading sequences is inefficient atm, it reads the whole table for + every sequence w/o any caching.""" + table = self.find("_entity_poly_seq.", ["entity_id", "num", "mon_id"]) + sequence = "" + num = 0 + for row in table: + if row["entity_id"] != entity: + continue + num += 1 + assert int(row["num"]) == num + sequence += gemmi.find_tabulated_residue( + row["mon_id"] + ).one_letter_code + + return sequence + + def write_file(self, filename, compress=False, style=cif.Style.Simple): + """Write ModelCIF file to disk, compress upon request. + Will compress anyways if file ends with '.gz'.""" + if compress or filename.endswith(".gz"): + if not filename.endswith(".gz"): + filename += ".gz" + with gzip.open(filename, mode="wt", compresslevel=9) as gfh: + gfh.write(self.doc.as_string(style)) + else: + self.doc.write_file(filename, style) + + def add_to_category(self, category, match=None, **kwargs): + """Add item values to a category. + Keyword arguments are reserved for item names.""" + if category[0] != "_": + category = "_" + category + if category[-1] != ".": + category += "." + items = list(kwargs.keys()) + row = None + if match is not None: + table = self.find(category, items + [match[0]]) + for row in table: + if row[match[0]] == match[1]: + break + if row is None: + raise RuntimeError( + f"No item {match[0]}=={match[1]} found in category " + + f"{category}." + ) + else: + table = self.find(category, items) + assert len(table) == 1 + row = table[0] + for itm, val in kwargs.items(): + if row[itm] not in [".", "?"]: + print( + f" replacing '{cif.as_string(row[itm])}' with " + + f"'{val}' ({itm})" + ) + row[itm] = _gemmi_quote(val) + + def add_category(self, category, after=None, **kwargs): + """Add a new category to the block with only 1 set of values, 1 row + thinking of categories as tables. kwargs are reserved for item/ value + pairs. after is a special keyword parameter to locate the new category + inside the block.""" + if not category.startswith("_"): + category = "_" + category + # handle quoting + for values in kwargs.values(): + for i, val in enumerate(values): + values[i] = _gemmi_quote(val) + self.block.set_mmcif_category(category, kwargs, raw=True) + + if after is None: + return + if not after.startswith("_"): + after = "_" + after + if not after.endswith("."): + after += "." + table = self.block.find_mmcif_category(after) + idx = self.block.get_index(table.tags[-1]) + # be careful with move_item: loops are 1 item and move as a whole, + # single line values move per item/ value par. + self.block.move_item(-1, idx + 1) diff --git a/projects/2024-12-ma-denv/translate2modelcif.py b/projects/2024-12-ma-denv/translate2modelcif.py new file mode 100644 index 0000000000000000000000000000000000000000..9fca2a7d8c4ee2ee5c5adfc088f3511170d7b0a2 --- /dev/null +++ b/projects/2024-12-ma-denv/translate2modelcif.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- + +"""Check & extend SWISS-MODEL models for ma-denv + +Example for running: +python translate2modelcif.py --unp-json example_unp_data.json \ + example_swissmodelcif.cif.gz \ + example_index.csv \ + done + +This call uses the example data from this directory. You may need to decompress +some of the files. The resulting './done/example_swissmodelcif.cif.gz' should +be identical to './example_swissmodelcif.cif.gz'. + +Expected output in ./done for example above: +- example_swissmodelcif.cif.gz as ModelCIF file with extended annotation +""" + +from datetime import date +import argparse +import csv +import os +import sys + +from macif import MABlock +from uniprotkb import UniProtKBEntryCache + + +################################################################################ +# GENERAL HELPER FUNCTIONS +################################################################################ +def _abort_msg(msg, exit_code=1): + """Write error message and exit with exit_code.""" + print(f"{msg}\nAborting.", file=sys.stderr) + sys.exit(exit_code) + + +def _check_file(file_path): + """Make sure a file exists and is actually a file.""" + if not os.path.exists(file_path): + _abort_msg(f"File not found: '{file_path}'.") + if not os.path.isfile(file_path): + _abort_msg(f"File path does not point to file: '{file_path}'.") + + +def _check_folder(dir_path, create): + """Make sure a file exists and is actually a file.""" + if not os.path.exists(dir_path): + if not create: + _abort_msg(f"Path not found: '{dir_path}'.") + os.makedirs(dir_path, exist_ok=True) + if not os.path.isdir(dir_path): + _abort_msg(f"Path does not point to a directory: '{dir_path}'.") + + +def _check_opts_folder(dir_path, create=False): + """Remove trailing '/' (return fixed one) and check if path valid.""" + if dir_path.endswith("/"): + dir_path = dir_path[:-1] + _check_folder(dir_path, create=create) + return dir_path + + +################################################################################ + + +################################################################################ +# DATA HANDLING +################################################################################ +def _parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, + ) + + parser.add_argument( + "modelcif", + type=str, + metavar="<INPUT MODELCIF FILE>", + help="Path to a SWISS-MODEL ModelCIF file provided by depositors.", + ) + parser.add_argument( + "model_info_csv", + type=str, + metavar="<MODEL INFO CSV FILE>", + help="Path to a CSV file with information about each model. Format: " + + "<FILENAME>,<UNP AC>,<UNP ALIGNMENT START>,<UNP ALIGNMENT END>," + + "<ENTITY DESCRIPTION>,<MODEL TITLE>,<MODEL DESCRIPTION>", + ) + parser.add_argument( + "out_dir", + type=str, + metavar="<OUTPUT DIR>", + help="Path to directory to store results. The updated ModelCIF file " + + "will be stored in this directory. If the ModelCIF file already " + + "exists in the output directory, it will not be overwritten and an " + + "error thrown.", + ) + parser.add_argument( + "--unp-json", + type=str, + default="./unp_data.json", + help="UniProtKB entries data. Information for each UNP entry fetched " + + "will be stored here. Serves as a cache. To update (e.g. after an " + + "UNP release), just delete the file and it will be recreated.", + ) + parser.add_argument( + "--compress", + "-c", + default=False, + action="store_true", + help="Compress ModelCIF file with gzip.", + ) + parser.add_argument( + "--checks-only", + default=False, + action="store_true", + help="Only check for issues without producing ModelCIF files.", + ) + opts = parser.parse_args() + + # check input + _check_file(opts.modelcif) + _check_file(opts.model_info_csv) + _check_opts_folder(opts.out_dir, create=True) + # check if the ModelCIF file already exists in out_dir + out_file = os.path.join( + opts.out_dir, + f"{os.path.basename(opts.modelcif)}{'.gz' if opts.compress else ''}", + ) + if os.path.exists(out_file): + _abort_msg(f"'{out_file}' already exists, will *not* be overwritten.") + # check if UNP is a file if it already exists + if os.path.exists(opts.unp_json): + _check_file(opts.unp_json) + + return opts + + +def _read_csv(mdl_file, mdl_info_csv): + """Get info of a specific model from CSV file""" + mdl_file = os.path.basename(mdl_file) + + mdl_info = {} + with open(mdl_info_csv, newline="", encoding="ascii") as csvh: + info_r = csv.reader(csvh) + for row in info_r: + assert len(row) == 7 + if row[0].endswith(mdl_file): + # <UNP AC> + mdl_info["unp_ac"] = row[1] + # <UNP ALIGNMENT START> + mdl_info["unp_start"] = int(row[2]) + # <UNP ALIGNMENT END> + mdl_info["unp_end"] = int(row[3]) + # ?(IS THIS A UNP FIELD? MAYBE CODE?) + mdl_info["protein_desc"] = row[4] + # <MODEL TITLE> + mdl_info["entry_title"] = row[5] + # <MODEL DESCRIPTION> + mdl_info["entry_desc"] = row[6] + break + if len(mdl_info) == 0: + _abort_msg(f"Could not find '{mdl_file}' in '{mdl_info_csv}'.") + + return mdl_info + + +################################################################################ + + +################################################################################ +# HANDLE FULL DATA SET +################################################################################ +def _update_modelcif(mdl_file, mdl_info, unp_json_file, out_dir, compress): + """Update ModelCIF file with model info and verify UNP related data. + Caution: This is for updates BEFORE deposition, this update does not do + a CIF-style revision history for you!""" + + block = MABlock(mdl_file) + assert len(block.polymer_targets) == 1 + target = block.polymer_targets[0] + unp = UniProtKBEntryCache(unp_json_file) + # print(mdl_info) + unp, db_aln, seq_aln = unp.match_sequence( + mdl_info["unp_ac"], + target["sequence"], + mdl_info["unp_start"], + mdl_info["unp_end"], + ) + + # update ModelCIF data + block.add_to_category( + "struct", + title=mdl_info["entry_title"], + pdbx_model_details=mdl_info["entry_desc"], + ) + block.add_to_category( + "entity", + pdbx_description=mdl_info["protein_desc"], + match=("id", target["entity_id"]), + ) + # Update ModelCIF files with UniProtKB info + struct_ref_id = "1" + block.add_category( + "struct_ref", + id=[struct_ref_id], + entity_id=[target["entity_id"]], + db_name=["UNP"], + db_code=[unp.unp_id], + pdbx_db_accession=[unp.unp_ac], + pdbx_align_begin=[mdl_info["unp_start"]], + pdbx_seq_one_letter_code=[unp.unp_seq], + details=[unp.unp_details_full], + after="entity", + ) + block.add_category( + "struct_ref_seq", + align_id=["1"], + ref_id=[struct_ref_id], + seq_align_beg=[seq_aln[0]], + seq_align_end=[seq_aln[1]], + db_align_beg=[db_aln[0]], + db_align_end=[db_aln[1]], + after="struct_ref", + ) + block.add_category( + "ma_target_ref_db_details", + target_entity_id=[target["entity_id"]], + db_name=["UNP"], + db_name_other_details=[False], + db_code=[unp.unp_id], + db_accession=[unp.unp_ac], + seq_db_isoform=[False], + seq_db_align_begin=[db_aln[0]], + seq_db_align_end=[db_aln[1]], + ncbi_taxonomy_id=[unp.ncbi_taxid], + organism_scientific=[unp.organism_species], + seq_db_sequence_version_date=[date.isoformat(unp.last_seq_change)], + seq_db_sequence_checksum=[unp.unp_crc64], + after="struct_ref_seq", + ) + # write, if requested write compressed + if out_dir is not None: + block.write_file( + os.path.join(out_dir, os.path.basename(mdl_file)), compress + ) + + +################################################################################ + + +def _main(): + """Run as script.""" + opts = _parse_args() + + # read model info from CSV + mdl_info = _read_csv(opts.modelcif, opts.model_info_csv) + print(f"Working on model {opts.modelcif}...") + _update_modelcif( + opts.modelcif, + mdl_info, + opts.unp_json, + opts.out_dir if not opts.checks_only else None, + opts.compress, + ) + print(f"... done with model {opts.modelcif}.") + + # TEST: to judge res. needed on cluster + # import resource + # print("mem", resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000) + + +if __name__ == "__main__": + _main() + +# LocalWords: gemmi diff --git a/projects/2024-12-ma-denv/uniprotkb.py b/projects/2024-12-ma-denv/uniprotkb.py new file mode 100644 index 0000000000000000000000000000000000000000..d934eabc278c83018fa84b9d689e2dc0342c4d4b --- /dev/null +++ b/projects/2024-12-ma-denv/uniprotkb.py @@ -0,0 +1,495 @@ +"""Functions to retrieve data for UniProtKB entries. +""" + +# Copyright (c) 2023, SIB - Swiss Institute of Bioinformatics and +# Biozentrum - University of Basel +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from datetime import datetime +import os +import requests + +try: + # pylint: disable="bare-except" + import ujson as json +except: + import json + +import parasail + +# Name of UniProtKB as _ma_target_ref_db_details.db_name in a ModelCIF file, +# but 'pythonified' (no leading '_', no '.'). +MA_TARGET_REF_DB_DETAILS_DB_NAME = "UNP" + + +def translate_upkb_date_string(date_string): + """UniProtKB uses 3-letter month codes, which do not fly in other languages + (locales) than English, e.g. if the locale is set to de_DE, strptime() + expects 'MAI' instead of 'MAY'... switching the locale temporarily is also + not so easy (threads, not setting it back upon exceptions...). Hence, we + make the month a numeric value, here. + """ + for i, mon in enumerate( + [ + "JAN", + "FEB", + "MAR", + "APR", + "MAY", + "JUN", + "JUL", + "AUG", + "SEP", + "OCT", + "NOV", + "DEC", + ], + start=1, + ): + if mon in date_string: + return date_string.replace(mon, f"{i:02}") + + raise RuntimeError( + "Unrecognised UniProtKB date string found: " + f"'{date_string}'." + ) + + +class UniProtKBEntry: + """Deal with UniProtKB entries.""" + + # We ignore PEP8 on the number of attributes per class, here. The objective + # of UniProtKBEntry is to... represent a UniProtKB entry with all its + # data/ attributes. By the nature of an entry's meta-data, breaking it up + # into sub-classes seems counter-intuitive. + # Since this is more a data class than a functional class, we allow having + # too few public methods for now. + # pylint: disable=too-many-instance-attributes, too-few-public-methods + + def __init__(self, unp_ac, entry_version=None, json_data=None): + """Create a new UniProtKB entry object. + + UniProtKB API will be queried immediately on creation. + + :param unp_ac: Accession code of the UniProtKB entry to be fetched. + :type: unp_ac: :class:`str` + :param entry_version: Version of the UniPrtoKB entry to be fetched (not + to be mixed up with the UniProtKB release). + :type entry_version: :class:`str` or :class:`int` + :param json_data: Init object from JSON object, ignores unp_ac. + :type json_data: :class:`dict` + """ + if json_data is None: + self.unp_ac = unp_ac + # filled by self._fetch() + self.entry_status = None + self.entry_version = ( + int(entry_version) if entry_version is not None else None + ) + self.first_appearance = None + self.last_change = None + self.last_seq_change = None + self.ncbi_taxid = None + self.organism_species = "" + self.seq_version = None + self.seqlen = None + self.unp_crc64 = None + self.unp_details_full = None + self.unp_id = None + self.unp_seq = "" + self._fetch() + assert len(self.unp_seq) == self.seqlen + else: + self.entry_status = json_data["entry_status"] + self.entry_version = int(json_data["entry_version"]) + self.first_appearance = ( + datetime.fromisoformat(json_data["first_appearance"]) + if json_data["first_appearance"] is not None + else None + ) + self.last_change = ( + datetime.fromisoformat(json_data["last_change"]) + if json_data["last_change"] is not None + else None + ) + self.last_seq_change = ( + datetime.fromisoformat(json_data["last_seq_change"]) + if json_data["last_seq_change"] is not None + else None + ) + self.ncbi_taxid = json_data["ncbi_taxid"] + self.organism_species = json_data["organism_species"] + self.seq_version = json_data["seq_version"] + self.seqlen = json_data["seqlen"] + self.unp_ac = json_data["ac"] + self.unp_crc64 = json_data["crc64"] + self.unp_details_full = json_data["details_full"] + self.unp_id = json_data["id"] + self.unp_seq = json_data["seq"] + + def __str__(self): + """Print object as string.""" + return ( + f"<{__name__}.{type(self).__name__} AC={self.unp_ac} " + + f"version={self.entry_version}>" + ) + + def _parse_id_line(self, line): + """Parse a UniProtKB TXT format's ID line. + + Should support some older format versions, too.""" + sline = line.split() + if len(sline) != 5: + # Some old formats that are easy to fix + if len(sline) == 6 and sline[3] == "PRT;": + sline.pop(3) + else: + raise RuntimeError( + "ID line not conforming to 'ID EntryName" + + f"Status; SequenceLength.', found: {line}" + ) + self.unp_id = sline[1] + self.entry_status = sline[2][:-1].upper() + self.seqlen = int(sline[3]) + + def _parse_dt_line(self, line): + """Parse a UniProtKB TXT format's DT line. + + Should support some older format versions, too.""" + sline = line.split() + sline[1] = translate_upkb_date_string(sline[1]) + if sline[2] == "(Rel.": # old format + if sline[4] == "Created)": + self.first_appearance = datetime.strptime(sline[1], "%d-%m-%Y") + self.entry_version = int(sline[3][:-1]) + elif sline[5] == "sequence": + self.last_seq_change = datetime.strptime(sline[1], "%d-%m-%Y") + self.seq_version = int(sline[3][:-1]) + elif sline[5] == "annotation": + self.last_change = datetime.strptime(sline[1], "%d-%m-%Y") + return + if sline[2] == "integrated": + self.first_appearance = datetime.strptime(sline[1], "%d-%m-%Y,") + elif sline[2] == "sequence": + self.last_seq_change = datetime.strptime(sline[1], "%d-%m-%Y,") + self.seq_version = int(sline[4][:-1]) + elif sline[2] == "entry": + self.last_change = datetime.strptime(sline[1], "%d-%m-%Y,") + self.entry_version = int(sline[4][:-1]) + + def _parse_de_line(self, line): + """Parse a UniProtKB TXT format's DE line(s).""" + sline = line.split() + if sline[1] == "RecName:": + if self.unp_details_full is None: + if sline[2].startswith("Full="): + self.unp_details_full = sline[2][len("Full=") :] + for i in sline[3:]: + if i.startswith("{"): + break + self.unp_details_full += f" {i}" + if self.unp_details_full.endswith(";"): + self.unp_details_full = self.unp_details_full[:-1] + break + + def _parse_os_line(self, line): + """Parse a UniProtKB TXT format's OS line(s).""" + osl = len("OS ") + if line[-1] == ".": + self.organism_species += line[osl:-1] + else: + self.organism_species += line[osl:-1] + " " + + def _parse_ox_line(self, line): + """Parse a UniProtKB TXT format's OX line.""" + # Taxonomy codes only come from NCBI atm. + sline = line.split("=") + self.ncbi_taxid = sline[-1][:-1] + self.ncbi_taxid = self.ncbi_taxid.split()[0] + + def _parse_sq_line(self, line): + """Parse a UniProtKB TXT format's SQ line.""" + sline = line.split() + self.unp_crc64 = sline[6] + + def _parse_sequence(self, line): + """Parse the sequence out of the UniProtKB TXT format.""" + sline = line.split() + self.unp_seq += "".join(sline) + + def _fetch(self): + """Retrieve information for a single UniProtKB entry.""" + if self.entry_version is None: + query_url = f"https://rest.uniprot.org/uniprotkb/{self.unp_ac}.txt" + else: + query_url = ( + f"https://rest.uniprot.org/unisave/{self.unp_ac}?format=txt&" + + f"versions={self.entry_version}" + ) + + rspns = requests.get(query_url, timeout=180) + if not rspns.ok: + raise RuntimeError( + f"UniProtKB entry with AC '{self.unp_ac}' not retrieved for " + + f"URL '{query_url}'" + ) + for line in rspns.iter_lines(decode_unicode=True): + # Check here to learn about UniProtKB's flat file format: + # https://web.expasy.org/docs/userman.html + # ID EntryName Status; SequenceLength. + if line.startswith("ID "): + self._parse_id_line(line) + # DT DD-MMM-YYYY, integrated into UniProtKB/database_name. + # DT DD-MMM-YYYY, sequence version x. + # DT DD-MMM-YYYY, entry version x. + elif line.startswith("DT "): + self._parse_dt_line(line) + # DE RecName: Full=Highly reducing ... tstA {ECO:...|PubMed:...}; + # DE Short=HR-PKS phiA {ECO:0000303|PubMed:26558485}; + # DE EC=2.3.1.- {ECO:0000269|PubMed:26558485}; + # DE AltName: Full=Phomoidride ... protein A {ECO:...|PubMed:...}; + # OS Avian leukosis RSA (RSV-SRA) (Rous sarcoma virus (strain + # OS Schmidt-Ruppin A)). + elif line.startswith("DE "): + self._parse_de_line(line) + elif line.startswith("OS "): + self._parse_os_line(line) + # OX Taxonomy_database_Qualifier=Taxonomic code; + elif line.startswith("OX NCBI_TaxID="): + self._parse_ox_line(line) + # SQ SEQUENCE 3392 AA; 378905 MW; BBD894175578E164 CRC64; + elif line.startswith("SQ "): + self._parse_sq_line(line) + # Seqeunce is stored special, w/o prefix and multiline + elif line.startswith(" "): + self._parse_sequence(line) + # print(line) + + def to_json(self): + """Return entry as JSON object.""" + return { + "ac": self.unp_ac, + "entry_version": self.entry_version, + "organism_species": self.organism_species, + "entry_status": self.entry_status, + "first_appearance": ( + datetime.isoformat(self.first_appearance) + if self.first_appearance is not None + else None + ), + "last_change": ( + datetime.isoformat(self.last_change) + if self.last_change is not None + else None + ), + "last_seq_change": ( + datetime.isoformat(self.last_seq_change) + if self.last_seq_change is not None + else None + ), + "ncbi_taxid": self.ncbi_taxid, + "seq_version": self.seq_version, + "seqlen": self.seqlen, + "details_full": self.unp_details_full, + "id": self.unp_id, + "crc64": self.unp_crc64, + "seq": self.unp_seq, + } + + +class UniProtKBEntryCache: + """Cached retrieval of UniProtKB entries. + + To avoid calling the UniProtKB API for the same UniProtKB AC multiple times, + use this cache. Also helps with keeping code cleaner since you do not need + to carry UniProtKBEntry instances around, just call the cached entry. + + Be careful about UniProtKB entry versions. In theory, when specifying no + version for the cache, the real entry may change at UniProtKB while + running... but this is also an issue when loading the entry live from + UniProtKB multiple times. + + Also be aware that the cache has no size restrictions, it does not get swept + over its whole lifetime.""" + + # The cache serves as a data-dump, storing and handing out instances. No + # need for a lot of public methods, so we ignore PEP, here. + # pylint: disable=too-few-public-methods + def __init__(self, json_cache_file=None): + """Set up the cache.""" + self._cache = {} + self._cache_file = json_cache_file + + # try to fill the cache from file + if ( + self._cache_file is not None + and os.path.exists(self._cache_file) + and os.stat(self._cache_file).st_size != 0 + ): + with open(self._cache_file, encoding="utf8") as jfh: + self._cache = self._from_json(json.load(jfh)) + + def get(self, unp_ac, entry_version=None): + """Get an UniProtKBEntry from the cache. + + If the entry can not be found, it will be fetched from the UniProtKB + API.""" + try: + return self._cache[unp_ac][entry_version] + except KeyError: + unp = UniProtKBEntry(unp_ac, entry_version=entry_version) + if unp_ac not in self._cache: + self._cache[unp_ac] = {} + self._cache[unp_ac][entry_version] = unp + # if we end up here, store the cache on disk + if self._cache_file is not None: + with open(self._cache_file, "w", encoding="utf8") as jfh: + json.dump(self.to_json(), jfh) + + return self._cache[unp_ac][entry_version] + + def to_json(self): + """Turn the cache into a JSON object.""" + data = {} + for acc, versions in self._cache.items(): + data[acc] = {} + for version, entry in versions.items(): + data[acc][version] = entry.to_json() + + return data + + def _from_json(self, data): + """Used to initialise the cache from a JSON object.""" + cache = {} + for acc, versions in data.items(): + cache[acc] = {} + for version, entry in versions.items(): + version = int(version) if version != "null" else None + cache[acc][version] = UniProtKBEntry(None, json_data=entry) + + return cache + + def match_sequence(self, unp_ac, sequence, start, end): + """Match a sequence with the sequence of a UNP entry. + + As the UNP sequence can change over time, walk through the versions + until exact match or return the best match. + + :param unp_ac: UniProtKB Accession. + :type unp_ac: :class:`str` + :param sequence: Target sequence of the model. + :type sequence: :class:`str` + :param start: Start residue of the alignment, 1-based. + :type start: :class:`int` + :param end: End residue of the alignment 1-based. + :type end: :class:`int` + """ + + # This function was first written for a bulk deposition that came with + # alignment range in the UNP sequence, UNP AC but no UNP entry version. + # So this function works with this for now. Later use cases should be + # incorporated as they occur. E.g. having no range (start, end would be + # None) should search for the best alignment and use the range from + # this in the ModelCIF file. + + def aln(unp, trg, start, end): + unp = unp[start:end] + # trg = parasail's query + # unp = parasail's ref + # Since we have a fixed range, we expect a global alignment, use + # parasail's NW variant. + alignment = parasail.nw_trace_scan_sat( + trg, unp, 5, 2, parasail.blosum62 + ) + # get aln boundaries - assuming a NW (global) alignment + db_aln_start = ( + start + len(unp) - len(alignment.traceback.query.lstrip("-")) + ) + db_aln_end = db_aln_start + len( + alignment.traceback.query.rstrip("-") + ) + seq_aln_start = len(trg) - len(alignment.traceback.ref.lstrip("-")) + seq_aln_end = seq_aln_start + len( + alignment.traceback.query.replace("-", "") + ) + # print("TRG", alignment.traceback.query) + # print("UNP", alignment.traceback.ref) + # print(f"TRG {seq_aln_start} {seq_aln_end}") + return ( + db_aln_start + 1, + db_aln_end, + seq_aln_start + 1, + seq_aln_end, + alignment.traceback.ref, + ) + + entry = self.get(unp_ac) + # depositor has provided alignment range, so we only align that region + db_aln_start, db_aln_end, seq_aln_start, seq_aln_end, aln_unp = aln( + entry.unp_seq, sequence, start - 1, end + ) + # print( + # f"{entry.entry_version} - start: {start}/ {db_aln_start+1} end: " + # + f"{end}/ {db_aln_end} len: {len(sequence)} " + # + f"({end-start})" + # ) + # Check that the alignment fits... make sure the aligned UNP sequence + # does not contain gaps, gaps in the UNP sequence would mean deleted + # residues in the target sequence. + if ( + db_aln_start == start + and db_aln_end == end + and aln_unp.find("-") == -1 + ): + return ( + entry, + (db_aln_start, db_aln_end), + (seq_aln_start, seq_aln_end), + ) + version = entry.entry_version + while version > 1: + version -= 1 + entry = self.get(unp_ac, entry_version=version) + db_aln_start, db_aln_end, seq_aln_start, seq_aln_end, aln_unp = ( + aln(entry.unp_seq, sequence, start - 1, end) + ) + # print( + # f"{version} - start: {start}/ {db_aln_start+1} end: {end}/ " + # + f"{db_aln_end} len: {len(sequence)} ({end-start})" + # ) + if ( + db_aln_start == start + and db_aln_end == end + and aln_unp.find("-") == -1 + ): + return ( + entry, + (db_aln_start, db_aln_end), + (seq_aln_start, seq_aln_end), + ) + # print("FOUND", entry.unp_seq.find(sequence)) + # if we end up here, there was no nice alignment + # ToDo: use _abort_msg here, once there is a proper Python module + raise RuntimeError( + f"Could not find a proper alignment in region {start}-{end} for " + + f"{unp_ac}." + ) + + +# LocalWords: de strptime UniProtKBEntry diff --git a/projects/README.template b/projects/README.template index 338d14a6ef9a40c84a8758730c735d09a347cd5e..eb86ee4c9baea676ced99d0e6cc502de59ab7cc6 100644 --- a/projects/README.template +++ b/projects/README.template @@ -2,10 +2,10 @@ [Link to project in ModelArchive](https://www.modelarchive.org/doi/10.5452/...) (incl. background on project itself and link to the publication) -Input files for conversion: +Modelling setup: ... -Modelling setup: +Input files for conversion: ... Special features here: