From 65ec1620ad5f937f2b86d6dc114781bd74e7acc5 Mon Sep 17 00:00:00 2001 From: B13nch3n <b13nch3n_01@theb-si.de> Date: Mon, 9 May 2022 09:23:29 +0200 Subject: [PATCH] Add validation tool --- validation/Dockerfile | 165 +++++++ validation/entrypoint.sh | 30 ++ validation/get-mmcif-dict-versions.py | 157 ++++++ validation/validate-mmcif-file.py | 687 ++++++++++++++++++++++++++ 4 files changed, 1039 insertions(+) create mode 100644 validation/Dockerfile create mode 100644 validation/entrypoint.sh create mode 100755 validation/get-mmcif-dict-versions.py create mode 100755 validation/validate-mmcif-file.py diff --git a/validation/Dockerfile b/validation/Dockerfile new file mode 100644 index 0000000..062c0c4 --- /dev/null +++ b/validation/Dockerfile @@ -0,0 +1,165 @@ +ARG VERSION_PYTHON="3.6.15" +ARG VERSION_BASE_IMAGE="python:${VERSION_PYTHON}-alpine3.15" +FROM ${VERSION_BASE_IMAGE} +# We need to declare ARGs again which were declared before the build stage +# (FROM directive), otherwise they won't be available in this stage. +ARG VERSION_PYTHON +ARG VERSION_BASE_IMAGE + +ARG VERSION_CPP_DICT_PACK="v2.500" +ARG VERSION_PY_MMCIF="0.76" + +## Set up environment +ENV MMCIF_DICTS_DIR="/usr/local/share/mmcif-dict-suite" \ + SRC_DIR="/tmp" \ + VERSION_CPP_DICT_PACK=${VERSION_CPP_DICT_PACK} \ + VERSION_BASE_IMAGE=${VERSION_BASE_IMAGE} \ + VERSION_PYTHON=${VERSION_PYTHON} \ + VERSION_PY_MMCIF=${VERSION_PY_MMCIF} \ + PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 + + +LABEL org.modelarchive.base-image="${VERSION_BASE_IMAGE}" +LABEL org.modelarchive.cpp-dict-pack.version="${VERSION_CPP_DICT_PACK}" +LABEL maintainer="Stefan Bienert <stefan.bienert@unibas.ch>" +LABEL vendor1="Schwede Group (schwedelab.org)" +LABEL vendor2="SIB - Swiss Institute of Bioinformatics (sib.swiss)" +LABEL vendor3="Biozentrum - University of Basel (biozentrum.unibas.ch)" + +## Install the RCSB CPP Dict Suite (only the binaries we need) +WORKDIR ${SRC_DIR} +RUN set -e pipefail; \ + export DICT_PACK_SRC_DIR="${SRC_DIR}/cpp-dict-pack.git"; \ + apk update; \ + apk upgrade; \ + apk add abuild binutils bison build-base cmake flex git gcc \ + extra-cmake-modules tcsh; \ + # + ## Install the RCSB mmCIF Dict Suite + git clone -b ${VERSION_CPP_DICT_PACK} \ + --single-branch --recurse-submodules \ + https://github.com/rcsb/cpp-dict-pack.git \ + ${DICT_PACK_SRC_DIR}; \ + mkdir ${DICT_PACK_SRC_DIR}/build; \ + cd ${DICT_PACK_SRC_DIR}; \ + cd ${DICT_PACK_SRC_DIR}/build; \ + cmake .. -DCMAKE_VERBOSE_MAKEFILE=ON; \ + make; \ + for cif_tool in CifCheck DictToSdb; do \ + mv bin/${cif_tool} /usr/local/bin; \ + done; \ + cd ${SRC_DIR}; \ + rm -r ${DICT_PACK_SRC_DIR}; \ + # + ## Install the RCSB py-mmcif Python module + /usr/local/bin/python -m pip install --upgrade pip; \ + /usr/local/bin/python -m pip install mmcif==${VERSION_PY_MMCIF} \ + python-rapidjson; \ + # + ## Clean up/ remove unnecessary stuff + apk del abuild binutils bison build-base cmake flex git gcc \ + extra-cmake-modules tcsh; \ + apk add libstdc++ + +## Add a dedicated user for mmCIF file validation +## MMCIF_USER_ID can be used to avoid file permission issues in development. +ARG MMCIF_USER_ID=501 +RUN adduser -S -u ${MMCIF_USER_ID} mmcif-vldtr + +## Copy tools (already in use during dictionary SDB creation) +COPY --chmod=755 get-mmcif-dict-versions.py \ + /usr/local/bin/get-mmcif-dict-versions + + +## Create dictionaries for validating mmCIF files. To rebuild dictionaries, +## rebuild the container with build argument DICT_FETCH_DATE="<DATA>.n" so +## only the RUN Command for building the dictionary is triggered. The ".n" +## should be an increasing number to enable simple multiple builds in one +## day, in case something goes wrong. +## Dictionaries do not change that frequently therefore we skip the hassle of +## keeping them in an external volume. +## To explore development versions of the MAX/mmCIF dictionary, right out of +## the Git repo, build with USE_DICT_RELEASE="dev". Default is "master" which +## loads from the master branch at https://github.com/ihmwg/ModelCIF. +ARG DICT_FETCH_DATE="2022-05-02.1" +ARG USE_DICT_RELEASE="master" +ENV DICT_FETCH_DATE=${DICT_FETCH_DATE} +ENV USE_DICT_RELEASE=${USE_DICT_RELEASE} +LABEL org.modelarchive.dict-fetch-date="${DICT_FETCH_DATE}" +LABEL org.modelarchive.dict_release="${USE_DICT_RELEASE}" +WORKDIR ${SRC_DIR} +RUN set -e pipefail; \ + apk add curl; \ + export _DICT_DIR="${SRC_DIR}/mmcif_dicts"; \ + export _DICT_URL="https://mmcif.wwpdb.org/dictionaries/ascii"; \ + export _PATHSPEC="a24fcfa8d6c3ceb4b6f0676bcc341ac0cd24ff1f"; \ + export _REPO_URL="https://raw.githubusercontent.com/ihmwg/ModelCIF/${_PATHSPEC}"; \ + export _MA_DICT_URL="${_REPO_URL}/dist/mmcif_ma.dic"; \ + export _DICT_REPO="ModelCIF.git"; \ + mkdir ${_DICT_DIR}; \ + mkdir ${MMCIF_DICTS_DIR}; \ + cd ${_DICT_DIR}; \ + # + ## Fetch the dictionary definition language + curl ${_DICT_URL}/mmcif_ddl.dic.gz -s -o mmcif_ddl.dic.gz; \ + gunzip *.gz; \ + # + ## Fetch the merged ModelCIF dictionary + # + ## Fetch the Git repo with the dictionaries + curl ${_MA_DICT_URL} -s -L -o mmcif_ma.dic; \ + # + ## Build the ModelCIF SDB + DictToSdb -ddlFile mmcif_ddl.dic \ + -dictFile mmcif_ma.dic \ + -dictSdbFile ${MMCIF_DICTS_DIR}/mmcif_ma.sdb; \ + # + ## Fetch the stable PDBx/mmCIF dictionary + curl ${_DICT_URL}/mmcif_pdbx_v50.dic -s -o mmcif_pdbx_v50.dic; \ + # + ## Build the PDBx/mmCIF SDB + DictToSdb -ddlFile mmcif_ddl.dic \ + -dictFile mmcif_pdbx_v50.dic \ + -dictSdbFile ${MMCIF_DICTS_DIR}/mmcif_pdbx_v50.dic.sdb; \ + # + ## Get versions of ModelCIF & PDBx/mmCIF dictionaries + get-mmcif-dict-versions --parent-location ${_REPO_URL}/base/mmcif_pdbx_v50.dic \ + --child-location ${_MA_DICT_URL} \ + mmcif_ma.dic; \ + mv mmcif_ma_version.json ${MMCIF_DICTS_DIR}/; \ + # + ## Make SDBs readable and keep possible error logs from building them + mv *.log ${MMCIF_DICTS_DIR}/ 2>/dev/null || :; \ + chmod o+r ${MMCIF_DICTS_DIR}/*; \ + # + ## Clean up + cd ${SRC_DIR}; \ + rm -r ${_DICT_DIR}; \ + apk del curl + + +COPY --chmod=755 entrypoint.sh / +COPY --chmod=755 validate-mmcif-file.py /usr/local/bin/validate-mmcif-file + +# for development +#RUN set -e pipefail; \ +# apk add bash emacs gcc build-base; \ +# /usr/local/bin/python -m pip install pylint black; \ +# apk del gcc build-base + +USER mmcif-vldtr + +ENTRYPOINT ["/entrypoint.sh"] + +# have tool ready +# - entrypoint: validate... just runs validation, celery runs celery, CMD else +# write Python to run & check mmCIF +# - Note dictionary versions in the mmCIF file! +# for Celery: +# - depends_on without implementing the 'waits' in this entrypoint.sh: +# https://marcopeg.com/docker-compose-healthcheck/ + + +# LocalWords: ENV DICTS SRC tmp schwedelab RCSB WORKDIR pipefail apk dev ARG +# LocalWords: ARGs diff --git a/validation/entrypoint.sh b/validation/entrypoint.sh new file mode 100644 index 0000000..224d7d6 --- /dev/null +++ b/validation/entrypoint.sh @@ -0,0 +1,30 @@ +#!/bin/sh +## (We use sh since Alpine does not have Bash by default) + + +## exit immediately on commands with a non-zero exit status. +set -euo pipefail + +## When started without any arguments, "-h", "--help", "-help" or "help", print +## usage. +if [ $# -eq 0 ] || [ x$1 == x"-h" ] || [ x$1 == x"--help" ] || + [ x$1 == x"-help" ] || [ x$1 == x"help" ]; then + echo " mmCIF file format validation tool." + echo "------------------------------------------" + echo "Provided by SWISS-MODEL / Schwede group" + echo "(swissmodel.expasy.org / schwedelab.org)" + echo "" + echo "This container checks that mmCIF files are" + echo "properly formatted according to the" + echo "MAX/ mmCIF dictionary. At the moment," + echo "there is one tool available that acts as a" + echo "command line tool: validate-mmcif-file." + echo "For further usage information, call this" + echo "container executing" + echo "'validate-mmcif-file --help'." + exit 1 +fi + +exec "$@" + +# LocalWords: euo pipefail eq Schwede schwedelab mmcif fi diff --git a/validation/get-mmcif-dict-versions.py b/validation/get-mmcif-dict-versions.py new file mode 100755 index 0000000..da1f1ea --- /dev/null +++ b/validation/get-mmcif-dict-versions.py @@ -0,0 +1,157 @@ +#! /usr/local/bin/python +"""Get version and location of relevant mmCIF dictionaries for ModelCIF. + +Fetch the versions of the ModelCIF dictionary and the PDBx/mmCIF dictionary used +to build it into a JSON file. +""" +# pylint: disable=invalid-name +# pylint: enable=invalid-name + +import argparse +import sys + +import rapidjson as json + +from mmcif.io.IoAdapterPy import IoAdapterPy + + +def _parse_command_line(): + """Get arguments.""" + parser = argparse.ArgumentParser(description=__doc__) + + parser.add_argument( + "dic_file", + type=str, + metavar="<DICTIONARY FILE>", + help="The mmCIF dictionary file to read the versions from.", + ) + parser.add_argument( + "--parent", + "-p", + type=str, + metavar="<NAME OF PARENT DICT>", + help="Name of to the 'parent' dictionary. This is the one the other " + + "dictionary is appended to. This is usually the mmcif_pdbx_v50.dic.", + default="mmcif_pdbx_v50.dic", + ) + parser.add_argument( + "--output", + "-o", + type=str, + metavar="<PATH TO VERSION FILE>", + help="Path to store the JSON file with the version at.", + default="mmcif_ma_version.json", + ) + parser.add_argument( + "--parent-location", + "-u", + type=str, + metavar="<URL OF PARENT DICT FILE>", + help="Download location of the parent dictionary file.", + default=None, + ) + parser.add_argument( + "--child-location", + "-l", + type=str, + metavar="<URL OF CHILD DICT FILE>", + help="Download location of the child dictionary file.", + default=None, + ) + opts = parser.parse_args() + + return opts + + +def _error(msg): + """Print a final error message.""" + print(msg + "\nAborting.", file=sys.stderr) + sys.exit(1) + + +def _get_data_cat(cat, file_name, data): + """Get a data category from a mmCIF data blob.""" + obj = data.getObj(cat) + if obj is None: + _error(f"No '{cat}' object found in '{file_name}'.") + + return obj + + +def _get_data_item(itm, cat, file_name, cat_data): + """Get a single data item from a data category.""" + val = cat_data.getAttributeValueList(itm) + if len(val) != 1: + _error( + f"Expected exactly 1 '{cat}.{itm}' in '{file_name}', " + + f"found '{', '.join(val)}'." + ) + + return val[0] + + +def _get_versions(dic_file, parent_name, io_adapter): + """Fetch the 'category_group_list' object and assemble a version for the + dictionary.""" + + dic = io_adapter.readFile(inputFilePath=dic_file) + + # fetch a data container from the list returned by the adapter + cntnr = None + for obj in dic: + if "dictionary" in obj.getObjNameList(): + cntnr = obj + break + + if cntnr is None: + _error(f"No 'dictionary' object found in '{dic_file}'.") + + dic = _get_data_cat("dictionary", dic_file, cntnr) + + vrsn = _get_data_item("version", "dictionary", dic_file, dic) + ttl = _get_data_item("title", "dictionary", dic_file, dic) + dic_version = {"title": ttl, "version": vrsn} + + cmp = _get_data_cat("pdbx_dictionary_component", dic_file, cntnr) + dc_idx = cmp.getAttributeIndex("dictionary_component_id") + vs_idx = cmp.getAttributeIndex("version") + for row in cmp: + if row[dc_idx] == parent_name: + vrsn = row[vs_idx] + prnt_version = {"title": parent_name, "version": vrsn} + break + + return dic_version, prnt_version + + +def _add_dict_location(parent, child, parent_loc, child_loc): + """Add URLs to the dictionary versions if available.""" + if parent_loc is None: + parent["location"] = "." + else: + parent["location"] = parent_loc + if child_loc is None: + child["location"] = "." + else: + child["location"] = child_loc + + +def _main(): + """Run as script.""" + opts = _parse_command_line() + + io_adapter = IoAdapterPy(False, sys.stdout) + c_vrsn, p_vrsn = _get_versions(opts.dic_file, opts.parent, io_adapter) + + _add_dict_location( + p_vrsn, c_vrsn, opts.parent_location, opts.child_location + ) + with open(opts.output, "w", encoding="utf8") as jfh: + json.dump({"versions": [p_vrsn, c_vrsn]}, jfh) + + +if __name__ == "__main__": + _main() + +# LocalWords: DictToSdb SDB PDBx CifCheck pylint mmcif pdbx dic nAborting +# LocalWords: macromolecular utf diff --git a/validation/validate-mmcif-file.py b/validation/validate-mmcif-file.py new file mode 100755 index 0000000..3eaa64e --- /dev/null +++ b/validation/validate-mmcif-file.py @@ -0,0 +1,687 @@ +#! /usr/local/bin/python +"""Validate mmCIF format in a model mmCIF file. + +Does not check if the model/ coordinates make sense. But includes associated +cif files in the check by merging files. That is, as an example, associated +files with quality scores stored in mmCIF format will be merged with the model +file and checked, but associated MSA files in FASTA format can not be merged +and thus, won't be merged into the model mmCIF file and won't be checked. +""" +# pylint: disable=invalid-name +# pylint: enable=invalid-name + +import argparse +import atexit +import copy +import os +import subprocess +import sys +import tempfile + +import rapidjson as json + + +from mmcif.api.DataCategory import DataCategory +from mmcif.api.PdbxContainers import DataContainer +from mmcif.io.PdbxReader import PdbxReader +from mmcif.io.PdbxWriter import PdbxWriter + + +def _parse_command_line(): + """Get arguments.""" + parser = argparse.ArgumentParser(description=__doc__) + + parser.add_argument( + "model_cif", + type=str, + metavar="<MODEL MMCIF FILE>", + help="Path to the model mmCIF file. This is the 'main' cif file of a " + + "modelling project including coordinates.", + ) + parser.add_argument( + "--associates-dir", + "-a", + type=str, + metavar="<DIR>", + help="Path to associated files, needed when the mmCIF file has " + + "external files attached.", + default=None, + ) + parser.add_argument( + "--dict-sdb", + "-d", + type=str, + metavar="<SDB FILE>", + help="The dictionary in SDB format used for checking.", + default="/usr/local/share/mmcif-dict-suite/mmcif_ma.sdb", + ) + parser.add_argument( + "--out-file", + "-o", + type=str, + metavar="<JSON FILE>", + help="Write the JSON output to file. Default is to write to stdout.", + default=None, + ) + parser.add_argument( + "--extend-validated-file", + "-e", + nargs="?", + const=" same ", + metavar="<FILE>", + help="Extend a positively validated mmCIF file with dictionary " + + "versions. If invoked without argument, write to the input model " + + "mmCIF file, otherwise specify a file name. Please note, the " + + "dictionary will not be added to the mmCIF file if there are any " + + "issues.", + default=None, + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Write some messages to stdout instead of just having it as JSON. " + + "Useful for debugging on the command line.", + ) + + opts = parser.parse_args() + + # post process arguments + if opts.extend_validated_file is not None: + if opts.extend_validated_file == " same ": + opts.extend_validated_file = opts.model_cif + + return opts + + +def _error(msg): + """Print a final error message.""" + print(msg + "\nAborting.", file=sys.stderr) + sys.exit(1) + + +def _warn(msg): + """Pritn a warning message.""" + print(f"WARNING: {msg}", file=sys.stderr) + + +def _parse_cifcheck_stderr(stderr): + """Parse the error output of CifCheck.""" + error_lst = [] + for line in stderr.splitlines(): + if line.startswith("Message: "): + error_lst.append(line[10:].rsplit('"', 1)[0]) + elif line.startswith("cp: "): + error_lst.append(line[4:]) + else: + _error(f"Unknown error output found: '{line}'") + + return error_lst + + +def _parse_parser_file(filename): + """Parse the parser output file of CifCheck.""" + parserfile = filename + "-parser.log" + if not os.path.exists(parserfile): + return [] + + error_lst = [] + with open(parserfile, encoding="utf-8") as dfh: + for line in dfh: + line = line.strip() + error_lst.append(line) + + # remove the diag file + os.unlink(parserfile) + + return error_lst + + +def _parse_diag_file(filename): + """Parse the diagnosis file of CifCheck.""" + # CifCheck places the diag file in the cwd. + diagfile = filename + "-diag.log" + if not os.path.exists(diagfile): + return [] + + error_lst = [] + # CifCheck outputs diag files as iso-8859 + with open(diagfile, encoding="iso-8859-1") as dfh: + for line in dfh: + line = line.strip() + if line == "": + continue + error_lst.append(line) + + # remove the diag file + os.unlink(diagfile) + + return error_lst + + +class _CifCheckFailedError(RuntimeError): + """Raise for failed CifCheck runs but include error messages.""" + + def __init__(self, cifcheck_cmd, error_lst): + """Create an exception""" + super().__init__(f"CifCheck failed for {' '.join(cifcheck_cmd)}") + self.cifcheck_errors = error_lst + self.cifcheck_cmd = cifcheck_cmd + + +def _read_mmcif(filepath): + """Read a mmCIF file""" + data_lst = [] + with open(filepath, encoding="utf-8") as ifh: + prd = PdbxReader(ifh) + prd.read(data_lst) + + return data_lst + + +def _write_mmcif(filepath, cif_data): + """Write data to mmCIF file""" + with open(filepath, "w", encoding="ascii") as ofh: + cifwriter = PdbxWriter(ofh) + # save a lot of whitespaces! + cifwriter.setAlignmentFlag(flag=False) + cifwriter.write(cif_data) + + +def _get_entry_id(cif_datablock, entry_id_map, datablock_idx): + """Get a mapping of the entry.id from a cif datablock.""" + entry = cif_datablock.getObj("entry") + if entry is not None: + eidx = entry.getAttributeIndex("id") + if eidx != -1: + for row in entry: + entry_id_map[row[eidx]] = datablock_idx + + +def _get_associated_files(model_cif_file): + """Get the list of associated files from a model cif file.""" + # This is an intermediate step, so we do not need to check/ report anything + # here. The actual confirmation comes out of CifCheck at a later stage. + mdl_cif = _read_mmcif(model_cif_file) + + entry_id_map = {} + assoc_files = [] + idxs = {} + for i, pdbx_cntnr in enumerate(mdl_cif): + # gather entry.id's for later + _get_entry_id(pdbx_cntnr, entry_id_map, i) + meafs = pdbx_cntnr.getObj("ma_entry_associated_files") + # If ma_entry_associated_files is not present then + # ma_associated_archive_file_details can't exist either since it has a + # ma_entry_associated_files.id relation. (CifCheck should notice that.) + if meafs is None: + continue + not_found = False + for j in ["file_format", "file_url", "entry_id"]: + idxs[j] = meafs.getAttributeIndex(j) + if idxs[j] == -1: + not_found = True + break + if not_found: + continue + for row in meafs: + if row[idxs["file_format"]] != "cif": + continue + assoc_files.append((row[idxs["file_url"]], row[idxs["entry_id"]])) + # make sure entry_id is matching in associated file! + maafd = pdbx_cntnr.getObj("ma_associated_archive_file_details") + if maafd is None: + continue + idxs["file_format"] = maafd.getAttributeIndex("file_format") + if idxs["file_format"] == -1: + continue + for row in maafd: + if row[idxs["file_format"]] == "cif": + raise NotImplementedError( + "Fetching associated cif files from archive." + ) + + return assoc_files, mdl_cif, entry_id_map + + +def _cmp_cif_rows(a_row, b_row, a_idxs, b_idxs, attrs): + """Compare two cif rows by given attributes""" + for i in attrs: + if a_row[a_idxs[i]] != b_row[b_idxs[i]]: + return False + + return True + + +def _add_row(row, src_idxs, dest_idxs, dest, attrs_l): + """Add a data row to an existing datablock with the right item order.""" + # create a new row fitting dest's order + new_row = list("?" * attrs_l) + for i, j in src_idxs.items(): + new_row[dest_idxs[i]] = row[j] + dest.append(new_row) + + +def _add_or_extend_rows(src, dest, common, not_in_dest): + """Mix/ add rows from src into dest.""" + # extend dest with new attributes + for attr in not_in_dest: + dest.appendAttribute(attr) + s_idx = src.getAttributeIndexDict() + d_idx = dest.getAttributeIndexDict() + attrs_l = len(d_idx) + d_rows = list(range(len(dest))) + for src_row in src: + match = False + for i in d_rows: + dest_row = dest[i] + match = _cmp_cif_rows(src_row, dest_row, s_idx, d_idx, common) + if match: + # extend with missing data items + for attr in not_in_dest: + dest_row.append(src_row[s_idx[attr]]) + d_rows.remove(i) + break + if not match: + _add_row(src_row, s_idx, d_idx, dest, attrs_l) + # extend dest rows that never matched with "?" as default value + for i in d_rows: + dest_row = dest[i] + for attr in not_in_dest: + dest_row.append("?") + + +def _merge_cif_datacontainer( + parent_datablock, datablock, exclude_categories=None +): + """Merge datablock into parent_datablock ignoring exclude_categories.""" + for category in datablock.getObjNameList(): + if category in exclude_categories: + continue + db_ctgry = datablock.getObj(category) + # check if the data category exists in parent + if parent_datablock.exists(category): + p_ctgry = parent_datablock.getObj(category) + # compare items + not_in_p, in_both, _ = db_ctgry.cmpAttributeNames(p_ctgry) + _add_or_extend_rows(db_ctgry, p_ctgry, in_both, not_in_p) + else: + # data category does not exist in parent, append it to datablock + parent_datablock.append(db_ctgry) + + +def _try_os_remove(path): + """Try to remove a file, don't complain if that fails.""" + try: + os.remove(path) + except: # pylint: disable=bare-except + pass + + +def _merge_cif_data( + model_cif_data, associated_path, row_entry_id, entry_id_map +): + """Merge contents of an associated file into cif data.""" + error_msgs = {"cifcheck-errors": []} + assoc_cif = _read_mmcif(associated_path) + + # per datablock, check to which datablock it belongs in the parent cif + for assoc_cntnr in assoc_cif: + # check/ get 'entry_link' + assoc_entry_link = assoc_cntnr.getObj("entry_link") + if assoc_entry_link is None: + error_msgs["cifcheck-errors"].append( + 'ERROR - category "entry_link" is mandatory, but it is not ' + + f'present in datablock "{assoc_cntnr.getName()}"' + ) + continue + # make sure entry_id exists for entry_link + entry_id_idx = assoc_entry_link.getAttributeIndex("entry_id") + if entry_id_idx == -1: + error_msgs["cifcheck-errors"].append( + f'ERROR - In block "{assoc_cntnr.getName()}", mandatory item ' + + '"entry_id" is not in category "entry_link"' + ) + continue + # For each entry_id, look up the corresponding datablock in + # model_cif_data and merge with that datablock. + for row in assoc_entry_link: + entry_id = row[entry_id_idx] + if entry_id != row_entry_id: + error_msgs["cifcheck-errors"].append( + f'ERROR - In block "{assoc_cntnr.getName()}", item ' + + '"entry_id" does not match item ' + + '"ma_entry_associated_files.entry_id"' + ) + continue + _merge_cif_datacontainer( + model_cif_data[entry_id_map[entry_id]], + assoc_cntnr, + exclude_categories=["entry_link"], + ) + + return error_msgs + + +class _CifCheck: + """Handling the CifCheck tool.""" + + def __init__(self, dict_sdb, json_out_file=None, verbose=False): + self._version = None + self.check_results = {} + self.dict_sdb = os.path.abspath(dict_sdb) + self.json_out_file = json_out_file + self.verbose = verbose + + @property + def version(self): + """Get version dictionary if available""" + if self._version is not None: + return self._version + vrsn_file = os.path.splitext(self.dict_sdb)[0] + "_version.json" + try: + with open(vrsn_file, "r", encoding="utf-8") as jfh: + vrsn = json.load(jfh) + except FileNotFoundError: + self._version = {"version": [{"title": None, "version": None}]} + self.add_general_error( + f"Version JSON file not found at '{vrsn_file}'" + ) + else: + self._version = vrsn + + return self._version + + def add_general_error(self, msg): + """Add a uncategorised error to the list.""" + if "errors" not in self.check_results: + self.check_results["errors"] = [msg] + else: + self.check_results["errors"].append(msg) + + def _execute(self, filepath): + """Execute the CifCheck tool on a model mmCIF file.""" + # If permission errors occur with the source directory of the CIF file, + # consider copying the file to a Python tempfile generated path. That + # deals with missing $TMP, $TEMP, etc.... variables. + # At the moment, cwd is switched to the source directory since CifCheck + # copies the file, otherwise. + cifcheck_filepath = os.path.basename(filepath) + cifcheck_cmd = [ + "CifCheck", + "-dictSdb", + self.dict_sdb, + "-f", + cifcheck_filepath, + ] + cifcheck_wd = os.path.dirname(os.path.abspath(filepath)) + cps = subprocess.run( + cifcheck_cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + check=False, + universal_newlines=True, + cwd=cifcheck_wd, + ) + error_lst = [] + # get error messages on the command line + filename = os.path.basename(filepath) + if cps.returncode != 0: + error_lst = _parse_cifcheck_stderr(cps.stderr) + error_lst.extend(_parse_parser_file(filename)) + raise _CifCheckFailedError(cifcheck_cmd, error_lst) + + # get messages from diagnosis file + error_lst.extend(_parse_diag_file(os.path.join(cifcheck_wd, filename))) + + return error_lst + + def run(self, cif_file): + """Run CifCheck for a given file and catch the output. + + Returns False if the CifCheck execution itself failed.""" + try: + format_errors = self._execute(cif_file) + except _CifCheckFailedError as exc: + if self.verbose: + _warn("failed to run CifCheck, Stopping.") + for line in exc.args: + print(line, file=sys.stderr) + print("CifCheck errors:", file=sys.stderr) + for line in exc.cifcheck_errors: + print(" ", line, file=sys.stderr) + + self.check_results["status"] = "aborted" + self.check_results["cifcheck-command"] = " ".join(exc.cifcheck_cmd) + self.check_results["cifcheck-errors"] = exc.cifcheck_errors + + return False + + self.check_results["status"] = "completed" + self.check_results["diagnosis"] = format_errors + + return True + + def got_issues(self): + """Query if there are parser or diagnosis messages.""" + if ( + "diagnosis" in self.check_results + and len(self.check_results["diagnosis"]) > 0 + ): + return True + + if ( + "cifcheck-errors" in self.check_results + and len(self.check_results["cifcheck-errors"]) > 0 + ): + return True + + return False + + def _update_audit_conform(self, ac_cat): + """Update an existing audit_conform category entry.""" + # check if name is there, if not, append + nm_idx = ac_cat.getAttributeIndex("dict_name") + vs_idx = ac_cat.getAttributeIndex("dict_version") + lc_idx = ac_cat.getAttributeIndex("dict_location") + for dct in self.version["versions"]: + found = False + for itm in ac_cat: + if dct["title"] == itm[nm_idx]: + itm[vs_idx] = dct["version"] + itm[lc_idx] = dct["location"] + found = True + break + if not found: + new_ac = [""] * 3 + new_ac[nm_idx] = dct["title"] + new_ac[vs_idx] = dct["version"] + new_ac[lc_idx] = dct["location"] + ac_cat.append(new_ac) + + def _add_audit_conform(self, pdbx_cntnr, mdl_cif, container_idx): + """Add audit_conform category entry to data container.""" + ac_cat = DataCategory( + "audit_conform", + ["dict_name", "dict_version", "dict_location"], + [ + [x["title"], x["version"], x["location"]] + for x in self.version["versions"] + ], + ) + # We want nicely formatted cif files, so place audit_conform + # after entry. + objs = pdbx_cntnr.getObjCatalog() + names = list(objs.keys()) + pdbx_cntnr = DataContainer(pdbx_cntnr.getName()) + found = False + while len(names) > 0: + nme = names.pop(0) + pdbx_cntnr.append(objs[nme]) + if nme == "entry": + pdbx_cntnr.append(ac_cat) + found = True + break + for nme in names: + pdbx_cntnr.append(objs[nme]) + if not found: + pdbx_cntnr.append(ac_cat) + mdl_cif[container_idx] = pdbx_cntnr + + def add_versions_to_mmcif_file(self, mdl_cif, dest_file): + """Add versions of mmCIF dictionaries to a mmCIF file. + + :param mdl_cif: CIF data to be equipped with version data. + :type mdl_cif: :class:`list` of DataContainer + :param dest_file: Path to write the modified file to. + :type dest_file: :class:`str` + """ + # add/ modify audit_conform category + for i, pdbx_cntnr in enumerate(mdl_cif): + ac_cat = pdbx_cntnr.getObj("audit_conform") + if ac_cat is not None: + self._update_audit_conform(ac_cat) + else: + self._add_audit_conform(pdbx_cntnr, mdl_cif, i) + + # write modified mmCIF containers to file + _write_mmcif(dest_file, mdl_cif) + + return mdl_cif + + def to_json(self): + """Get CifCheck results as JSON.""" + self.check_results.update(self.version) + return json.dumps(self.check_results) + + def make_json_output(self): + """Dump JSON results of CifCheck either as file or print to stdout.""" + if self.verbose: + print("=============== CifCheck Errors ==============") + if "cifcheck-errors" in self.check_results: + for line in self.check_results["cifcheck-errors"]: + print(line) + sys.stdout.write("\n") + print("============= CifCheck Diagnosis =============") + if "diagnosis" in self.check_results: + for line in self.check_results["diagnosis"]: + print(line) + sys.stdout.write("\n") + + json_data = self.to_json() + if self.verbose or not self.json_out_file: + print(json_data) + + if self.json_out_file is not None: + with open(self.json_out_file, "w", encoding="utf-8") as jfh: + jfh.write(json_data) + + def add_to_results(self, msgs): + """Add messages to the CifCheck results""" + if "cifcheck-errors" not in self.check_results: + self.check_results["cifcheck-errors"] = msgs["cifcheck-errors"] + else: + self.check_results["cifcheck-errors"].extend( + msgs["cifcheck-errors"] + ) + + +def _find_utf(line): + """Try to find a word with an UTF character in a string.""" + for i, ltr in enumerate(line): + try: + ltr.encode("ascii", "strict") + except UnicodeEncodeError: + return i + + return None + + +def _file_has_utf(filename, cifcheck): + """Check a file to not contain UTF characters as mmCIF only allows ASCII.""" + with open(filename, encoding="utf-8") as cfh: + for i, line in enumerate(cfh): + try: + line.encode("ascii", "strict") + except UnicodeEncodeError: + idx = _find_utf(line) + cifcheck.add_general_error( + "File is non-ascii as it has an UTF character in line " + + f"{i}, index {idx}." + ) + return True + + return False + + +def _main(): + """Run as script""" + opts = _parse_command_line() + + # set up the CifCheck tool + cifcheck = _CifCheck(opts.dict_sdb, opts.out_file, opts.verbose) + + if _file_has_utf(opts.model_cif, cifcheck): + cifcheck.make_json_output() + sys.exit(1) + + # do a first check of the model cif alone to make sure its actual cif + success = cifcheck.run(opts.model_cif) + if not success: + cifcheck.make_json_output() + sys.exit(1) + + # check for associated files referenced by the model cif file + assoc_files, model_cif_data, entry_id_map = _get_associated_files( + opts.model_cif + ) + # save original data for later + if opts.extend_validated_file is not None: + o_model_cif_data = copy.deepcopy(model_cif_data) + # make sure associated files exist and merge all of them into the model + for assoc, entry_id in assoc_files: + assoc_path = os.path.join(opts.associates_dir, assoc) + # CifCheck the file to make sure its actually cif, diagnosis messages do + # not matter at this point as an incomplete file is tested. + success = cifcheck.run(assoc_path) + if not success: + cifcheck.make_json_output() + sys.exit(1) + # merge the model.cif and the associated file + msgs = _merge_cif_data( + model_cif_data, assoc_path, entry_id, entry_id_map + ) + cifcheck.add_to_results(msgs) + + validate_file = opts.model_cif + if assoc_files: + # write merged data to disk, create tmp file, clean up when done + cfh, cfn = tempfile.mkstemp(suffix=".cif", text=True) + # register for deletion here and in cwd + atexit.register(_try_os_remove, cfn) + os.close(cfh) + _write_mmcif(cfn, model_cif_data) + validate_file = cfn + + # validate file + success = cifcheck.run(validate_file) + if not success: + cifcheck.make_json_output() + sys.exit(1) + + # upon request (-e) extend the ORIGINAL file (not the merged one) + if not cifcheck.got_issues() and opts.extend_validated_file is not None: + cifcheck.add_versions_to_mmcif_file( + o_model_cif_data, opts.extend_validated_file + ) + + # return JSON as file or to stdout + if opts.out_file and opts.verbose: + print(f"Writing results of CifCheck to '{opts.out_file}'") + cifcheck.make_json_output() + + +if __name__ == "__main__": + _main() + +# LocalWords: cif MSA FASTA pylint stdout CifCheck param src str dest cwd -- GitLab