Skip to content
Snippets Groups Projects
Commit 65ec1620 authored by B13nch3n's avatar B13nch3n
Browse files

Add validation tool

parent 9f8adf37
No related branches found
No related tags found
No related merge requests found
ARG VERSION_PYTHON="3.6.15"
ARG VERSION_BASE_IMAGE="python:${VERSION_PYTHON}-alpine3.15"
FROM ${VERSION_BASE_IMAGE}
# We need to declare ARGs again which were declared before the build stage
# (FROM directive), otherwise they won't be available in this stage.
ARG VERSION_PYTHON
ARG VERSION_BASE_IMAGE
ARG VERSION_CPP_DICT_PACK="v2.500"
ARG VERSION_PY_MMCIF="0.76"
## Set up environment
ENV MMCIF_DICTS_DIR="/usr/local/share/mmcif-dict-suite" \
SRC_DIR="/tmp" \
VERSION_CPP_DICT_PACK=${VERSION_CPP_DICT_PACK} \
VERSION_BASE_IMAGE=${VERSION_BASE_IMAGE} \
VERSION_PYTHON=${VERSION_PYTHON} \
VERSION_PY_MMCIF=${VERSION_PY_MMCIF} \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1
LABEL org.modelarchive.base-image="${VERSION_BASE_IMAGE}"
LABEL org.modelarchive.cpp-dict-pack.version="${VERSION_CPP_DICT_PACK}"
LABEL maintainer="Stefan Bienert <stefan.bienert@unibas.ch>"
LABEL vendor1="Schwede Group (schwedelab.org)"
LABEL vendor2="SIB - Swiss Institute of Bioinformatics (sib.swiss)"
LABEL vendor3="Biozentrum - University of Basel (biozentrum.unibas.ch)"
## Install the RCSB CPP Dict Suite (only the binaries we need)
WORKDIR ${SRC_DIR}
RUN set -e pipefail; \
export DICT_PACK_SRC_DIR="${SRC_DIR}/cpp-dict-pack.git"; \
apk update; \
apk upgrade; \
apk add abuild binutils bison build-base cmake flex git gcc \
extra-cmake-modules tcsh; \
#
## Install the RCSB mmCIF Dict Suite
git clone -b ${VERSION_CPP_DICT_PACK} \
--single-branch --recurse-submodules \
https://github.com/rcsb/cpp-dict-pack.git \
${DICT_PACK_SRC_DIR}; \
mkdir ${DICT_PACK_SRC_DIR}/build; \
cd ${DICT_PACK_SRC_DIR}; \
cd ${DICT_PACK_SRC_DIR}/build; \
cmake .. -DCMAKE_VERBOSE_MAKEFILE=ON; \
make; \
for cif_tool in CifCheck DictToSdb; do \
mv bin/${cif_tool} /usr/local/bin; \
done; \
cd ${SRC_DIR}; \
rm -r ${DICT_PACK_SRC_DIR}; \
#
## Install the RCSB py-mmcif Python module
/usr/local/bin/python -m pip install --upgrade pip; \
/usr/local/bin/python -m pip install mmcif==${VERSION_PY_MMCIF} \
python-rapidjson; \
#
## Clean up/ remove unnecessary stuff
apk del abuild binutils bison build-base cmake flex git gcc \
extra-cmake-modules tcsh; \
apk add libstdc++
## Add a dedicated user for mmCIF file validation
## MMCIF_USER_ID can be used to avoid file permission issues in development.
ARG MMCIF_USER_ID=501
RUN adduser -S -u ${MMCIF_USER_ID} mmcif-vldtr
## Copy tools (already in use during dictionary SDB creation)
COPY --chmod=755 get-mmcif-dict-versions.py \
/usr/local/bin/get-mmcif-dict-versions
## Create dictionaries for validating mmCIF files. To rebuild dictionaries,
## rebuild the container with build argument DICT_FETCH_DATE="<DATA>.n" so
## only the RUN Command for building the dictionary is triggered. The ".n"
## should be an increasing number to enable simple multiple builds in one
## day, in case something goes wrong.
## Dictionaries do not change that frequently therefore we skip the hassle of
## keeping them in an external volume.
## To explore development versions of the MAX/mmCIF dictionary, right out of
## the Git repo, build with USE_DICT_RELEASE="dev". Default is "master" which
## loads from the master branch at https://github.com/ihmwg/ModelCIF.
ARG DICT_FETCH_DATE="2022-05-02.1"
ARG USE_DICT_RELEASE="master"
ENV DICT_FETCH_DATE=${DICT_FETCH_DATE}
ENV USE_DICT_RELEASE=${USE_DICT_RELEASE}
LABEL org.modelarchive.dict-fetch-date="${DICT_FETCH_DATE}"
LABEL org.modelarchive.dict_release="${USE_DICT_RELEASE}"
WORKDIR ${SRC_DIR}
RUN set -e pipefail; \
apk add curl; \
export _DICT_DIR="${SRC_DIR}/mmcif_dicts"; \
export _DICT_URL="https://mmcif.wwpdb.org/dictionaries/ascii"; \
export _PATHSPEC="a24fcfa8d6c3ceb4b6f0676bcc341ac0cd24ff1f"; \
export _REPO_URL="https://raw.githubusercontent.com/ihmwg/ModelCIF/${_PATHSPEC}"; \
export _MA_DICT_URL="${_REPO_URL}/dist/mmcif_ma.dic"; \
export _DICT_REPO="ModelCIF.git"; \
mkdir ${_DICT_DIR}; \
mkdir ${MMCIF_DICTS_DIR}; \
cd ${_DICT_DIR}; \
#
## Fetch the dictionary definition language
curl ${_DICT_URL}/mmcif_ddl.dic.gz -s -o mmcif_ddl.dic.gz; \
gunzip *.gz; \
#
## Fetch the merged ModelCIF dictionary
#
## Fetch the Git repo with the dictionaries
curl ${_MA_DICT_URL} -s -L -o mmcif_ma.dic; \
#
## Build the ModelCIF SDB
DictToSdb -ddlFile mmcif_ddl.dic \
-dictFile mmcif_ma.dic \
-dictSdbFile ${MMCIF_DICTS_DIR}/mmcif_ma.sdb; \
#
## Fetch the stable PDBx/mmCIF dictionary
curl ${_DICT_URL}/mmcif_pdbx_v50.dic -s -o mmcif_pdbx_v50.dic; \
#
## Build the PDBx/mmCIF SDB
DictToSdb -ddlFile mmcif_ddl.dic \
-dictFile mmcif_pdbx_v50.dic \
-dictSdbFile ${MMCIF_DICTS_DIR}/mmcif_pdbx_v50.dic.sdb; \
#
## Get versions of ModelCIF & PDBx/mmCIF dictionaries
get-mmcif-dict-versions --parent-location ${_REPO_URL}/base/mmcif_pdbx_v50.dic \
--child-location ${_MA_DICT_URL} \
mmcif_ma.dic; \
mv mmcif_ma_version.json ${MMCIF_DICTS_DIR}/; \
#
## Make SDBs readable and keep possible error logs from building them
mv *.log ${MMCIF_DICTS_DIR}/ 2>/dev/null || :; \
chmod o+r ${MMCIF_DICTS_DIR}/*; \
#
## Clean up
cd ${SRC_DIR}; \
rm -r ${_DICT_DIR}; \
apk del curl
COPY --chmod=755 entrypoint.sh /
COPY --chmod=755 validate-mmcif-file.py /usr/local/bin/validate-mmcif-file
# for development
#RUN set -e pipefail; \
# apk add bash emacs gcc build-base; \
# /usr/local/bin/python -m pip install pylint black; \
# apk del gcc build-base
USER mmcif-vldtr
ENTRYPOINT ["/entrypoint.sh"]
# have tool ready
# - entrypoint: validate... just runs validation, celery runs celery, CMD else
# write Python to run & check mmCIF
# - Note dictionary versions in the mmCIF file!
# for Celery:
# - depends_on without implementing the 'waits' in this entrypoint.sh:
# https://marcopeg.com/docker-compose-healthcheck/
# LocalWords: ENV DICTS SRC tmp schwedelab RCSB WORKDIR pipefail apk dev ARG
# LocalWords: ARGs
#!/bin/sh
## (We use sh since Alpine does not have Bash by default)
## exit immediately on commands with a non-zero exit status.
set -euo pipefail
## When started without any arguments, "-h", "--help", "-help" or "help", print
## usage.
if [ $# -eq 0 ] || [ x$1 == x"-h" ] || [ x$1 == x"--help" ] ||
[ x$1 == x"-help" ] || [ x$1 == x"help" ]; then
echo " mmCIF file format validation tool."
echo "------------------------------------------"
echo "Provided by SWISS-MODEL / Schwede group"
echo "(swissmodel.expasy.org / schwedelab.org)"
echo ""
echo "This container checks that mmCIF files are"
echo "properly formatted according to the"
echo "MAX/ mmCIF dictionary. At the moment,"
echo "there is one tool available that acts as a"
echo "command line tool: validate-mmcif-file."
echo "For further usage information, call this"
echo "container executing"
echo "'validate-mmcif-file --help'."
exit 1
fi
exec "$@"
# LocalWords: euo pipefail eq Schwede schwedelab mmcif fi
#! /usr/local/bin/python
"""Get version and location of relevant mmCIF dictionaries for ModelCIF.
Fetch the versions of the ModelCIF dictionary and the PDBx/mmCIF dictionary used
to build it into a JSON file.
"""
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import argparse
import sys
import rapidjson as json
from mmcif.io.IoAdapterPy import IoAdapterPy
def _parse_command_line():
"""Get arguments."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"dic_file",
type=str,
metavar="<DICTIONARY FILE>",
help="The mmCIF dictionary file to read the versions from.",
)
parser.add_argument(
"--parent",
"-p",
type=str,
metavar="<NAME OF PARENT DICT>",
help="Name of to the 'parent' dictionary. This is the one the other "
+ "dictionary is appended to. This is usually the mmcif_pdbx_v50.dic.",
default="mmcif_pdbx_v50.dic",
)
parser.add_argument(
"--output",
"-o",
type=str,
metavar="<PATH TO VERSION FILE>",
help="Path to store the JSON file with the version at.",
default="mmcif_ma_version.json",
)
parser.add_argument(
"--parent-location",
"-u",
type=str,
metavar="<URL OF PARENT DICT FILE>",
help="Download location of the parent dictionary file.",
default=None,
)
parser.add_argument(
"--child-location",
"-l",
type=str,
metavar="<URL OF CHILD DICT FILE>",
help="Download location of the child dictionary file.",
default=None,
)
opts = parser.parse_args()
return opts
def _error(msg):
"""Print a final error message."""
print(msg + "\nAborting.", file=sys.stderr)
sys.exit(1)
def _get_data_cat(cat, file_name, data):
"""Get a data category from a mmCIF data blob."""
obj = data.getObj(cat)
if obj is None:
_error(f"No '{cat}' object found in '{file_name}'.")
return obj
def _get_data_item(itm, cat, file_name, cat_data):
"""Get a single data item from a data category."""
val = cat_data.getAttributeValueList(itm)
if len(val) != 1:
_error(
f"Expected exactly 1 '{cat}.{itm}' in '{file_name}', "
+ f"found '{', '.join(val)}'."
)
return val[0]
def _get_versions(dic_file, parent_name, io_adapter):
"""Fetch the 'category_group_list' object and assemble a version for the
dictionary."""
dic = io_adapter.readFile(inputFilePath=dic_file)
# fetch a data container from the list returned by the adapter
cntnr = None
for obj in dic:
if "dictionary" in obj.getObjNameList():
cntnr = obj
break
if cntnr is None:
_error(f"No 'dictionary' object found in '{dic_file}'.")
dic = _get_data_cat("dictionary", dic_file, cntnr)
vrsn = _get_data_item("version", "dictionary", dic_file, dic)
ttl = _get_data_item("title", "dictionary", dic_file, dic)
dic_version = {"title": ttl, "version": vrsn}
cmp = _get_data_cat("pdbx_dictionary_component", dic_file, cntnr)
dc_idx = cmp.getAttributeIndex("dictionary_component_id")
vs_idx = cmp.getAttributeIndex("version")
for row in cmp:
if row[dc_idx] == parent_name:
vrsn = row[vs_idx]
prnt_version = {"title": parent_name, "version": vrsn}
break
return dic_version, prnt_version
def _add_dict_location(parent, child, parent_loc, child_loc):
"""Add URLs to the dictionary versions if available."""
if parent_loc is None:
parent["location"] = "."
else:
parent["location"] = parent_loc
if child_loc is None:
child["location"] = "."
else:
child["location"] = child_loc
def _main():
"""Run as script."""
opts = _parse_command_line()
io_adapter = IoAdapterPy(False, sys.stdout)
c_vrsn, p_vrsn = _get_versions(opts.dic_file, opts.parent, io_adapter)
_add_dict_location(
p_vrsn, c_vrsn, opts.parent_location, opts.child_location
)
with open(opts.output, "w", encoding="utf8") as jfh:
json.dump({"versions": [p_vrsn, c_vrsn]}, jfh)
if __name__ == "__main__":
_main()
# LocalWords: DictToSdb SDB PDBx CifCheck pylint mmcif pdbx dic nAborting
# LocalWords: macromolecular utf
#! /usr/local/bin/python
"""Validate mmCIF format in a model mmCIF file.
Does not check if the model/ coordinates make sense. But includes associated
cif files in the check by merging files. That is, as an example, associated
files with quality scores stored in mmCIF format will be merged with the model
file and checked, but associated MSA files in FASTA format can not be merged
and thus, won't be merged into the model mmCIF file and won't be checked.
"""
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import argparse
import atexit
import copy
import os
import subprocess
import sys
import tempfile
import rapidjson as json
from mmcif.api.DataCategory import DataCategory
from mmcif.api.PdbxContainers import DataContainer
from mmcif.io.PdbxReader import PdbxReader
from mmcif.io.PdbxWriter import PdbxWriter
def _parse_command_line():
"""Get arguments."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"model_cif",
type=str,
metavar="<MODEL MMCIF FILE>",
help="Path to the model mmCIF file. This is the 'main' cif file of a "
+ "modelling project including coordinates.",
)
parser.add_argument(
"--associates-dir",
"-a",
type=str,
metavar="<DIR>",
help="Path to associated files, needed when the mmCIF file has "
+ "external files attached.",
default=None,
)
parser.add_argument(
"--dict-sdb",
"-d",
type=str,
metavar="<SDB FILE>",
help="The dictionary in SDB format used for checking.",
default="/usr/local/share/mmcif-dict-suite/mmcif_ma.sdb",
)
parser.add_argument(
"--out-file",
"-o",
type=str,
metavar="<JSON FILE>",
help="Write the JSON output to file. Default is to write to stdout.",
default=None,
)
parser.add_argument(
"--extend-validated-file",
"-e",
nargs="?",
const=" same ",
metavar="<FILE>",
help="Extend a positively validated mmCIF file with dictionary "
+ "versions. If invoked without argument, write to the input model "
+ "mmCIF file, otherwise specify a file name. Please note, the "
+ "dictionary will not be added to the mmCIF file if there are any "
+ "issues.",
default=None,
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Write some messages to stdout instead of just having it as JSON. "
+ "Useful for debugging on the command line.",
)
opts = parser.parse_args()
# post process arguments
if opts.extend_validated_file is not None:
if opts.extend_validated_file == " same ":
opts.extend_validated_file = opts.model_cif
return opts
def _error(msg):
"""Print a final error message."""
print(msg + "\nAborting.", file=sys.stderr)
sys.exit(1)
def _warn(msg):
"""Pritn a warning message."""
print(f"WARNING: {msg}", file=sys.stderr)
def _parse_cifcheck_stderr(stderr):
"""Parse the error output of CifCheck."""
error_lst = []
for line in stderr.splitlines():
if line.startswith("Message: "):
error_lst.append(line[10:].rsplit('"', 1)[0])
elif line.startswith("cp: "):
error_lst.append(line[4:])
else:
_error(f"Unknown error output found: '{line}'")
return error_lst
def _parse_parser_file(filename):
"""Parse the parser output file of CifCheck."""
parserfile = filename + "-parser.log"
if not os.path.exists(parserfile):
return []
error_lst = []
with open(parserfile, encoding="utf-8") as dfh:
for line in dfh:
line = line.strip()
error_lst.append(line)
# remove the diag file
os.unlink(parserfile)
return error_lst
def _parse_diag_file(filename):
"""Parse the diagnosis file of CifCheck."""
# CifCheck places the diag file in the cwd.
diagfile = filename + "-diag.log"
if not os.path.exists(diagfile):
return []
error_lst = []
# CifCheck outputs diag files as iso-8859
with open(diagfile, encoding="iso-8859-1") as dfh:
for line in dfh:
line = line.strip()
if line == "":
continue
error_lst.append(line)
# remove the diag file
os.unlink(diagfile)
return error_lst
class _CifCheckFailedError(RuntimeError):
"""Raise for failed CifCheck runs but include error messages."""
def __init__(self, cifcheck_cmd, error_lst):
"""Create an exception"""
super().__init__(f"CifCheck failed for {' '.join(cifcheck_cmd)}")
self.cifcheck_errors = error_lst
self.cifcheck_cmd = cifcheck_cmd
def _read_mmcif(filepath):
"""Read a mmCIF file"""
data_lst = []
with open(filepath, encoding="utf-8") as ifh:
prd = PdbxReader(ifh)
prd.read(data_lst)
return data_lst
def _write_mmcif(filepath, cif_data):
"""Write data to mmCIF file"""
with open(filepath, "w", encoding="ascii") as ofh:
cifwriter = PdbxWriter(ofh)
# save a lot of whitespaces!
cifwriter.setAlignmentFlag(flag=False)
cifwriter.write(cif_data)
def _get_entry_id(cif_datablock, entry_id_map, datablock_idx):
"""Get a mapping of the entry.id from a cif datablock."""
entry = cif_datablock.getObj("entry")
if entry is not None:
eidx = entry.getAttributeIndex("id")
if eidx != -1:
for row in entry:
entry_id_map[row[eidx]] = datablock_idx
def _get_associated_files(model_cif_file):
"""Get the list of associated files from a model cif file."""
# This is an intermediate step, so we do not need to check/ report anything
# here. The actual confirmation comes out of CifCheck at a later stage.
mdl_cif = _read_mmcif(model_cif_file)
entry_id_map = {}
assoc_files = []
idxs = {}
for i, pdbx_cntnr in enumerate(mdl_cif):
# gather entry.id's for later
_get_entry_id(pdbx_cntnr, entry_id_map, i)
meafs = pdbx_cntnr.getObj("ma_entry_associated_files")
# If ma_entry_associated_files is not present then
# ma_associated_archive_file_details can't exist either since it has a
# ma_entry_associated_files.id relation. (CifCheck should notice that.)
if meafs is None:
continue
not_found = False
for j in ["file_format", "file_url", "entry_id"]:
idxs[j] = meafs.getAttributeIndex(j)
if idxs[j] == -1:
not_found = True
break
if not_found:
continue
for row in meafs:
if row[idxs["file_format"]] != "cif":
continue
assoc_files.append((row[idxs["file_url"]], row[idxs["entry_id"]]))
# make sure entry_id is matching in associated file!
maafd = pdbx_cntnr.getObj("ma_associated_archive_file_details")
if maafd is None:
continue
idxs["file_format"] = maafd.getAttributeIndex("file_format")
if idxs["file_format"] == -1:
continue
for row in maafd:
if row[idxs["file_format"]] == "cif":
raise NotImplementedError(
"Fetching associated cif files from archive."
)
return assoc_files, mdl_cif, entry_id_map
def _cmp_cif_rows(a_row, b_row, a_idxs, b_idxs, attrs):
"""Compare two cif rows by given attributes"""
for i in attrs:
if a_row[a_idxs[i]] != b_row[b_idxs[i]]:
return False
return True
def _add_row(row, src_idxs, dest_idxs, dest, attrs_l):
"""Add a data row to an existing datablock with the right item order."""
# create a new row fitting dest's order
new_row = list("?" * attrs_l)
for i, j in src_idxs.items():
new_row[dest_idxs[i]] = row[j]
dest.append(new_row)
def _add_or_extend_rows(src, dest, common, not_in_dest):
"""Mix/ add rows from src into dest."""
# extend dest with new attributes
for attr in not_in_dest:
dest.appendAttribute(attr)
s_idx = src.getAttributeIndexDict()
d_idx = dest.getAttributeIndexDict()
attrs_l = len(d_idx)
d_rows = list(range(len(dest)))
for src_row in src:
match = False
for i in d_rows:
dest_row = dest[i]
match = _cmp_cif_rows(src_row, dest_row, s_idx, d_idx, common)
if match:
# extend with missing data items
for attr in not_in_dest:
dest_row.append(src_row[s_idx[attr]])
d_rows.remove(i)
break
if not match:
_add_row(src_row, s_idx, d_idx, dest, attrs_l)
# extend dest rows that never matched with "?" as default value
for i in d_rows:
dest_row = dest[i]
for attr in not_in_dest:
dest_row.append("?")
def _merge_cif_datacontainer(
parent_datablock, datablock, exclude_categories=None
):
"""Merge datablock into parent_datablock ignoring exclude_categories."""
for category in datablock.getObjNameList():
if category in exclude_categories:
continue
db_ctgry = datablock.getObj(category)
# check if the data category exists in parent
if parent_datablock.exists(category):
p_ctgry = parent_datablock.getObj(category)
# compare items
not_in_p, in_both, _ = db_ctgry.cmpAttributeNames(p_ctgry)
_add_or_extend_rows(db_ctgry, p_ctgry, in_both, not_in_p)
else:
# data category does not exist in parent, append it to datablock
parent_datablock.append(db_ctgry)
def _try_os_remove(path):
"""Try to remove a file, don't complain if that fails."""
try:
os.remove(path)
except: # pylint: disable=bare-except
pass
def _merge_cif_data(
model_cif_data, associated_path, row_entry_id, entry_id_map
):
"""Merge contents of an associated file into cif data."""
error_msgs = {"cifcheck-errors": []}
assoc_cif = _read_mmcif(associated_path)
# per datablock, check to which datablock it belongs in the parent cif
for assoc_cntnr in assoc_cif:
# check/ get 'entry_link'
assoc_entry_link = assoc_cntnr.getObj("entry_link")
if assoc_entry_link is None:
error_msgs["cifcheck-errors"].append(
'ERROR - category "entry_link" is mandatory, but it is not '
+ f'present in datablock "{assoc_cntnr.getName()}"'
)
continue
# make sure entry_id exists for entry_link
entry_id_idx = assoc_entry_link.getAttributeIndex("entry_id")
if entry_id_idx == -1:
error_msgs["cifcheck-errors"].append(
f'ERROR - In block "{assoc_cntnr.getName()}", mandatory item '
+ '"entry_id" is not in category "entry_link"'
)
continue
# For each entry_id, look up the corresponding datablock in
# model_cif_data and merge with that datablock.
for row in assoc_entry_link:
entry_id = row[entry_id_idx]
if entry_id != row_entry_id:
error_msgs["cifcheck-errors"].append(
f'ERROR - In block "{assoc_cntnr.getName()}", item '
+ '"entry_id" does not match item '
+ '"ma_entry_associated_files.entry_id"'
)
continue
_merge_cif_datacontainer(
model_cif_data[entry_id_map[entry_id]],
assoc_cntnr,
exclude_categories=["entry_link"],
)
return error_msgs
class _CifCheck:
"""Handling the CifCheck tool."""
def __init__(self, dict_sdb, json_out_file=None, verbose=False):
self._version = None
self.check_results = {}
self.dict_sdb = os.path.abspath(dict_sdb)
self.json_out_file = json_out_file
self.verbose = verbose
@property
def version(self):
"""Get version dictionary if available"""
if self._version is not None:
return self._version
vrsn_file = os.path.splitext(self.dict_sdb)[0] + "_version.json"
try:
with open(vrsn_file, "r", encoding="utf-8") as jfh:
vrsn = json.load(jfh)
except FileNotFoundError:
self._version = {"version": [{"title": None, "version": None}]}
self.add_general_error(
f"Version JSON file not found at '{vrsn_file}'"
)
else:
self._version = vrsn
return self._version
def add_general_error(self, msg):
"""Add a uncategorised error to the list."""
if "errors" not in self.check_results:
self.check_results["errors"] = [msg]
else:
self.check_results["errors"].append(msg)
def _execute(self, filepath):
"""Execute the CifCheck tool on a model mmCIF file."""
# If permission errors occur with the source directory of the CIF file,
# consider copying the file to a Python tempfile generated path. That
# deals with missing $TMP, $TEMP, etc.... variables.
# At the moment, cwd is switched to the source directory since CifCheck
# copies the file, otherwise.
cifcheck_filepath = os.path.basename(filepath)
cifcheck_cmd = [
"CifCheck",
"-dictSdb",
self.dict_sdb,
"-f",
cifcheck_filepath,
]
cifcheck_wd = os.path.dirname(os.path.abspath(filepath))
cps = subprocess.run(
cifcheck_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
check=False,
universal_newlines=True,
cwd=cifcheck_wd,
)
error_lst = []
# get error messages on the command line
filename = os.path.basename(filepath)
if cps.returncode != 0:
error_lst = _parse_cifcheck_stderr(cps.stderr)
error_lst.extend(_parse_parser_file(filename))
raise _CifCheckFailedError(cifcheck_cmd, error_lst)
# get messages from diagnosis file
error_lst.extend(_parse_diag_file(os.path.join(cifcheck_wd, filename)))
return error_lst
def run(self, cif_file):
"""Run CifCheck for a given file and catch the output.
Returns False if the CifCheck execution itself failed."""
try:
format_errors = self._execute(cif_file)
except _CifCheckFailedError as exc:
if self.verbose:
_warn("failed to run CifCheck, Stopping.")
for line in exc.args:
print(line, file=sys.stderr)
print("CifCheck errors:", file=sys.stderr)
for line in exc.cifcheck_errors:
print(" ", line, file=sys.stderr)
self.check_results["status"] = "aborted"
self.check_results["cifcheck-command"] = " ".join(exc.cifcheck_cmd)
self.check_results["cifcheck-errors"] = exc.cifcheck_errors
return False
self.check_results["status"] = "completed"
self.check_results["diagnosis"] = format_errors
return True
def got_issues(self):
"""Query if there are parser or diagnosis messages."""
if (
"diagnosis" in self.check_results
and len(self.check_results["diagnosis"]) > 0
):
return True
if (
"cifcheck-errors" in self.check_results
and len(self.check_results["cifcheck-errors"]) > 0
):
return True
return False
def _update_audit_conform(self, ac_cat):
"""Update an existing audit_conform category entry."""
# check if name is there, if not, append
nm_idx = ac_cat.getAttributeIndex("dict_name")
vs_idx = ac_cat.getAttributeIndex("dict_version")
lc_idx = ac_cat.getAttributeIndex("dict_location")
for dct in self.version["versions"]:
found = False
for itm in ac_cat:
if dct["title"] == itm[nm_idx]:
itm[vs_idx] = dct["version"]
itm[lc_idx] = dct["location"]
found = True
break
if not found:
new_ac = [""] * 3
new_ac[nm_idx] = dct["title"]
new_ac[vs_idx] = dct["version"]
new_ac[lc_idx] = dct["location"]
ac_cat.append(new_ac)
def _add_audit_conform(self, pdbx_cntnr, mdl_cif, container_idx):
"""Add audit_conform category entry to data container."""
ac_cat = DataCategory(
"audit_conform",
["dict_name", "dict_version", "dict_location"],
[
[x["title"], x["version"], x["location"]]
for x in self.version["versions"]
],
)
# We want nicely formatted cif files, so place audit_conform
# after entry.
objs = pdbx_cntnr.getObjCatalog()
names = list(objs.keys())
pdbx_cntnr = DataContainer(pdbx_cntnr.getName())
found = False
while len(names) > 0:
nme = names.pop(0)
pdbx_cntnr.append(objs[nme])
if nme == "entry":
pdbx_cntnr.append(ac_cat)
found = True
break
for nme in names:
pdbx_cntnr.append(objs[nme])
if not found:
pdbx_cntnr.append(ac_cat)
mdl_cif[container_idx] = pdbx_cntnr
def add_versions_to_mmcif_file(self, mdl_cif, dest_file):
"""Add versions of mmCIF dictionaries to a mmCIF file.
:param mdl_cif: CIF data to be equipped with version data.
:type mdl_cif: :class:`list` of DataContainer
:param dest_file: Path to write the modified file to.
:type dest_file: :class:`str`
"""
# add/ modify audit_conform category
for i, pdbx_cntnr in enumerate(mdl_cif):
ac_cat = pdbx_cntnr.getObj("audit_conform")
if ac_cat is not None:
self._update_audit_conform(ac_cat)
else:
self._add_audit_conform(pdbx_cntnr, mdl_cif, i)
# write modified mmCIF containers to file
_write_mmcif(dest_file, mdl_cif)
return mdl_cif
def to_json(self):
"""Get CifCheck results as JSON."""
self.check_results.update(self.version)
return json.dumps(self.check_results)
def make_json_output(self):
"""Dump JSON results of CifCheck either as file or print to stdout."""
if self.verbose:
print("=============== CifCheck Errors ==============")
if "cifcheck-errors" in self.check_results:
for line in self.check_results["cifcheck-errors"]:
print(line)
sys.stdout.write("\n")
print("============= CifCheck Diagnosis =============")
if "diagnosis" in self.check_results:
for line in self.check_results["diagnosis"]:
print(line)
sys.stdout.write("\n")
json_data = self.to_json()
if self.verbose or not self.json_out_file:
print(json_data)
if self.json_out_file is not None:
with open(self.json_out_file, "w", encoding="utf-8") as jfh:
jfh.write(json_data)
def add_to_results(self, msgs):
"""Add messages to the CifCheck results"""
if "cifcheck-errors" not in self.check_results:
self.check_results["cifcheck-errors"] = msgs["cifcheck-errors"]
else:
self.check_results["cifcheck-errors"].extend(
msgs["cifcheck-errors"]
)
def _find_utf(line):
"""Try to find a word with an UTF character in a string."""
for i, ltr in enumerate(line):
try:
ltr.encode("ascii", "strict")
except UnicodeEncodeError:
return i
return None
def _file_has_utf(filename, cifcheck):
"""Check a file to not contain UTF characters as mmCIF only allows ASCII."""
with open(filename, encoding="utf-8") as cfh:
for i, line in enumerate(cfh):
try:
line.encode("ascii", "strict")
except UnicodeEncodeError:
idx = _find_utf(line)
cifcheck.add_general_error(
"File is non-ascii as it has an UTF character in line "
+ f"{i}, index {idx}."
)
return True
return False
def _main():
"""Run as script"""
opts = _parse_command_line()
# set up the CifCheck tool
cifcheck = _CifCheck(opts.dict_sdb, opts.out_file, opts.verbose)
if _file_has_utf(opts.model_cif, cifcheck):
cifcheck.make_json_output()
sys.exit(1)
# do a first check of the model cif alone to make sure its actual cif
success = cifcheck.run(opts.model_cif)
if not success:
cifcheck.make_json_output()
sys.exit(1)
# check for associated files referenced by the model cif file
assoc_files, model_cif_data, entry_id_map = _get_associated_files(
opts.model_cif
)
# save original data for later
if opts.extend_validated_file is not None:
o_model_cif_data = copy.deepcopy(model_cif_data)
# make sure associated files exist and merge all of them into the model
for assoc, entry_id in assoc_files:
assoc_path = os.path.join(opts.associates_dir, assoc)
# CifCheck the file to make sure its actually cif, diagnosis messages do
# not matter at this point as an incomplete file is tested.
success = cifcheck.run(assoc_path)
if not success:
cifcheck.make_json_output()
sys.exit(1)
# merge the model.cif and the associated file
msgs = _merge_cif_data(
model_cif_data, assoc_path, entry_id, entry_id_map
)
cifcheck.add_to_results(msgs)
validate_file = opts.model_cif
if assoc_files:
# write merged data to disk, create tmp file, clean up when done
cfh, cfn = tempfile.mkstemp(suffix=".cif", text=True)
# register for deletion here and in cwd
atexit.register(_try_os_remove, cfn)
os.close(cfh)
_write_mmcif(cfn, model_cif_data)
validate_file = cfn
# validate file
success = cifcheck.run(validate_file)
if not success:
cifcheck.make_json_output()
sys.exit(1)
# upon request (-e) extend the ORIGINAL file (not the merged one)
if not cifcheck.got_issues() and opts.extend_validated_file is not None:
cifcheck.add_versions_to_mmcif_file(
o_model_cif_data, opts.extend_validated_file
)
# return JSON as file or to stdout
if opts.out_file and opts.verbose:
print(f"Writing results of CifCheck to '{opts.out_file}'")
cifcheck.make_json_output()
if __name__ == "__main__":
_main()
# LocalWords: cif MSA FASTA pylint stdout CifCheck param src str dest cwd
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment