Skip to content
Snippets Groups Projects
validate-mmcif-file.py 32.26 KiB
#! /usr/local/bin/python
"""Validate mmCIF format in a model mmCIF file.

Does not check if the model/ coordinates make sense. But includes associated
cif files in the check by merging files. That is, as an example, associated
files with quality scores stored in mmCIF format will be merged with the model
file and checked, but associated MSA files in FASTA format can not be merged
and thus, won't be merged into the model mmCIF file and won't be checked.
"""
# pylint: disable=invalid-name
# pylint: enable=invalid-name

# ToDo: enable testing of gzipped files
# ToDo: add "modelcif-pedantic" mode, fail on categories that are technically
#       allowed but discouraged to be used, like _exptl
# ToDo: Remove pip installs which are in requirements.txt from Dockerfile

from io import TextIOWrapper
import argparse
import atexit
import copy
import os
import re
import subprocess
import sys
import tempfile
import zipfile

from validators import url as is_url
import rapidjson as json
import requests


from mmcif.api.DataCategory import DataCategory
from mmcif.api.PdbxContainers import DataContainer
from mmcif.io.PdbxReader import PdbxReader
from mmcif.io.PdbxWriter import PdbxWriter
import mmcif.io.PdbxExceptions


def _parse_command_line():
    """Get arguments."""
    parser = argparse.ArgumentParser(description=__doc__)

    parser.add_argument(
        "model_cif",
        type=str,
        metavar="<MODEL MMCIF FILE>",
        help="Path to the model mmCIF file. This is the 'main' cif file of a "
        + "modelling project including coordinates.",
    )
    parser.add_argument(
        "--associates-dir",
        "-a",
        type=str,
        metavar="<DIR>",
        help="Path to associated files, needed when the mmCIF file has "
        + "external files attached.",
        default=None,
    )
    parser.add_argument(
        "--dict-sdb",
        "-d",
        type=str,
        metavar="<SDB FILE>",
        help="The dictionary in SDB format used for checking.",
        default="/usr/local/share/mmcif-dict-suite/mmcif_ma.sdb",
    )
    parser.add_argument(
        "--out-file",
        "-o",
        type=str,
        metavar="<JSON FILE>",
        help="Write the JSON output to file. Default is to write to stdout.",
        default=None,
    )
    parser.add_argument(
        "--extend-validated-file",
        "-e",
        nargs="?",
        const=" same ",
        metavar="<FILE>",
        help="Extend a positively validated mmCIF file with dictionary "
        + "versions. If invoked without argument, write to the input model "
        + "mmCIF file, otherwise specify a file name. Please note, the "
        + "dictionary will not be added to the mmCIF file if there are any "
        + "issues.",
        default=None,
    )
    parser.add_argument(
        "--report",
        "-r",
        action="store_true",
        help="Print a concise report. Skips redundancies, may loose some "
        + "information. Usable to verify your own writer on single files. "
        + "Resolving problems of the report very often fixes the whole file.",
    )
    parser.add_argument(
        "--verbose",
        "-v",
        action="store_true",
        help="Write some messages to stdout instead of just having it as JSON. "
        + "Useful for debugging on the command line.",
    )

    opts = parser.parse_args()

    # post process arguments
    if opts.extend_validated_file is not None:
        if opts.extend_validated_file == " same ":
            opts.extend_validated_file = opts.model_cif

    return opts


def _error(msg):
    """Print a final error message."""
    print(msg + "\nAborting.", file=sys.stderr)
    sys.exit(1)


def _warn(msg):
    """Pritn a warning message."""
    print(f"WARNING: {msg}", file=sys.stderr)


def _parse_cifcheck_stderr(stderr):
    """Parse the error output of CifCheck."""
    error_lst = []
    for line in stderr.splitlines():
        if line.startswith("Message: "):
            error_lst.append(line[10:].rsplit('"', 1)[0])
        elif line.startswith("cp: "):
            error_lst.append(line[4:])
        else:
            _error(f"Unknown error output found: '{line}'")

    return error_lst


def _parse_parser_file(filename):
    """Parse the parser output file of CifCheck."""
    parserfile = filename + "-parser.log"
    if not os.path.exists(parserfile):
        return []

    error_lst = []
    with open(parserfile, encoding="utf-8") as dfh:
        for line in dfh:
            line = line.strip()
            error_lst.append(line)

    # remove the diag file
    os.unlink(parserfile)

    return error_lst


def _parse_diag_file(filename):
    """Parse the diagnosis file of CifCheck."""
    # CifCheck places the diag file in the cwd.
    diagfile = filename + "-diag.log"
    if not os.path.exists(diagfile):
        return []

    error_lst = []
    # CifCheck outputs diag files as iso-8859
    with open(diagfile, encoding="iso-8859-1") as dfh:
        for line in dfh:
            line = line.strip()
            if line == "":
                continue
            error_lst.append(line)

    # remove the diag file
    os.unlink(diagfile)

    return error_lst


class _CifCheckFailedError(RuntimeError):
    """Raise for failed CifCheck runs but include error messages."""

    def __init__(self, cifcheck_cmd, error_lst):
        """Create an exception"""
        super().__init__(f"CifCheck failed for {' '.join(cifcheck_cmd)}")
        self.cifcheck_errors = error_lst
        self.cifcheck_cmd = cifcheck_cmd


def _read_mmcif(filepath_or_object):
    """Read a mmCIF file"""
    data_lst = []
    if isinstance(filepath_or_object, str):
        with open(filepath_or_object, encoding="utf-8") as ifh:
            prd = PdbxReader(ifh)
            prd.read(data_lst)
    else:
        prd = PdbxReader(filepath_or_object)
        prd.read(data_lst)

    return data_lst


def _write_mmcif(filepath, cif_data):
    """Write data to mmCIF file"""
    with open(filepath, "w", encoding="ascii") as ofh:
        cifwriter = PdbxWriter(ofh)
        # save a lot of whitespaces!
        cifwriter.setAlignmentFlag(flag=False)
        cifwriter.write(cif_data)


def _get_indeces(data_category, attribute_list):
    """Get column indexes for a list of attributes."""
    idxs = {}
    for attr in attribute_list:
        idxs[attr] = data_category.getAttributeIndex(attr)
        if idxs[attr] == -1:
            return {}

    return idxs


def _get_entry_id(cif_datablock, entry_id_map, datablock_idx):
    """Get a mapping of the entry.id from a cif datablock."""
    entry = cif_datablock.getObj("entry")
    if entry is not None:
        eidx = entry.getAttributeIndex("id")
        if eidx != -1:
            for row in entry:
                entry_id_map[row[eidx]] = datablock_idx


def _download_file(file_url):
    """Download a file into a temporary file. Mark for deletion on
    termination"""
    rspns = requests.get(file_url, stream=True, timeout=600)
    if not rspns.ok:
        raise RuntimeError(f"File not found by URL '{file_url}'.")

    dlf = tempfile.TemporaryFile()
    for chunk in rspns.iter_content(chunk_size=1024):
        dlf.write(chunk)
    dlf.seek(0)

    return dlf


def _get_assoc_obj(file_or_url, assoc_dir):
    """Get a path to an associated file. Will download from internet if path
    is a URL. Downloaded files are automatically hooked up for deletion after
    the script terminates."""
    if assoc_dir is None or not os.path.exists(
        os.path.join(assoc_dir, file_or_url)
    ):
        if is_url(file_or_url):
            return _download_file(file_or_url)

        raise RuntimeError(
            "Associated file path does not point to actual file or URL: "
            + f"'{assoc_dir}/{file_or_url}'"
        )

    return os.path.join(assoc_dir, file_or_url)


def _get_arc_zipfile_handle(arc_file, assoc_dir):
    """Get a ZipFile object and a list of files in the archive."""
    assoc_obj = _get_assoc_obj(arc_file, assoc_dir)
    # PyLint wants us to use a context manager here. This is not possible as the
    # ZipFile object is used outside this function.
    # pylint: disable=consider-using-with
    arc_zip = zipfile.ZipFile(assoc_obj)

    return arc_zip, arc_zip.namelist()


def _unzip_arc_cif(arc_zip, cif_file):
    """Extract a cif file from a ZIP archive."""
    assoc_data = []
    with TextIOWrapper(arc_zip.open(cif_file), encoding="utf-8") as cif_fh:
        assoc_data = _read_mmcif(cif_fh)

    return assoc_data


def _get_associated_files(model_cif_file, assoc_dir, cifcheck):
    """Get the list of associated files from a model cif file."""
    # This is an intermediate step, so we do not need to check/ report anything
    # here. The actual confirmation comes out of CifCheck at a later stage.
    entry_id_map = {}
    assoc_files = []
    try:
        mdl_cif = _read_mmcif(model_cif_file)
    except mmcif.io.PdbxExceptions.PdbxSyntaxError:
        return assoc_files, None, entry_id_map

    archives = {}
    for i, pdbx_cntnr in enumerate(mdl_cif):
        # gather entry.id's for later
        _get_entry_id(pdbx_cntnr, entry_id_map, i)
        dat_cat = pdbx_cntnr.getObj("ma_entry_associated_files")
        # If ma_entry_associated_files is not present then
        # ma_associated_archive_file_details can't exist either since it has a
        # ma_entry_associated_files.id relation. (CifCheck should notice that.)
        if dat_cat is None:
            continue
        idxs = _get_indeces(
            dat_cat, ["entry_id", "file_format", "file_type", "file_url", "id"]
        )
        if not idxs:
            continue
        for row in dat_cat:
            if row[idxs["file_type"]] == "archive":
                archives[row[idxs["id"]]] = (
                    row[idxs["file_url"]],
                    row[idxs["entry_id"]],
                )
            if row[idxs["file_format"]] != "cif":
                continue
            # this should be easy to make reading URLs, using
            # _get_assoc_obj(row[idxs["file_url"]], assoc_dir) but for now
            # I have not seen the use case (no web server at hand for testing)
            data = _read_mmcif(os.path.join(assoc_dir, row[idxs["file_url"]]))
            assoc_files.append((data, row[idxs["entry_id"]]))
        # make sure entry_id is matching in associated file!
        dat_cat = pdbx_cntnr.getObj("ma_associated_archive_file_details")
        if dat_cat is None:
            continue
        idxs = _get_indeces(
            dat_cat,
            ["archive_file_id", "file_content", "file_format", "file_path"],
        )
        last_arc_id = ""
        arc_zip = None
        for row in dat_cat:
            # Get a ZipFile object of the archive to read CIF files and check
            # the presence of non-CIF files.
            arc_id = row[idxs["archive_file_id"]]
            arc_file = archives[arc_id][0]
            if arc_id != last_arc_id:
                last_arc_id = arc_id
                if arc_zip is not None:
                    arc_zip.close()
                arc_zip, arc_namelist = _get_arc_zipfile_handle(
                    arc_file, assoc_dir
                )
            if row[idxs["file_format"]] == "cif":
                if row[idxs["file_content"]] == "local pairwise QA scores":
                    cif_file = row[idxs["file_path"]]
                    data = _unzip_arc_cif(arc_zip, cif_file)
                    assoc_files.append((data, archives[arc_id][1]))
                elif row[idxs["file_content"]] not in ["other"]:
                    raise RuntimeError(
                        "Unknown associated CIF file content "
                        + f"found: {row[idxs['file_content']]}"
                    )
            else:
                if row[idxs["file_path"]] not in arc_namelist:
                    cifcheck.add_general_error(
                        f"ma_entry_associated_files.file_url '{arc_file}' is "
                        + "missing "
                        + "ma_associated_archive_file_details.file_path "
                        + f"'{row[idxs['file_path']]}'"
                    )
        arc_zip.close()

    return assoc_files, mdl_cif, entry_id_map


def _cmp_cif_rows(a_row, b_row, a_idxs, b_idxs, attrs):
    """Compare two cif rows by given attributes"""
    for i in attrs:
        if a_row[a_idxs[i]] != b_row[b_idxs[i]]:
            return False

    return True


def _add_row(row, src_idxs, dest_idxs, dest, attrs_l):
    """Add a data row to an existing datablock with the right item order."""
    # create a new row fitting dest's order
    new_row = list("?" * attrs_l)
    for i, j in src_idxs.items():
        new_row[dest_idxs[i]] = row[j]
    dest.append(new_row)


def _add_or_extend_rows(src, dest, common, not_in_dest):
    """Mix/ add rows from src into dest."""
    # extend dest with new attributes
    for attr in not_in_dest:
        dest.appendAttribute(attr)
    s_idx = src.getAttributeIndexDict()
    d_idx = dest.getAttributeIndexDict()
    attrs_l = len(d_idx)
    d_rows = list(range(len(dest)))
    for src_row in src:
        match = False
        for i in d_rows:
            dest_row = dest[i]
            match = _cmp_cif_rows(src_row, dest_row, s_idx, d_idx, common)
            if match:
                # extend with missing data items
                for attr in not_in_dest:
                    dest_row.append(src_row[s_idx[attr]])
                d_rows.remove(i)
                break
        if not match:
            _add_row(src_row, s_idx, d_idx, dest, attrs_l)
    # extend dest rows that never matched with "?" as default value
    for i in d_rows:
        dest_row = dest[i]
        for attr in not_in_dest:
            dest_row.append("?")


def _merge_cif_datacontainer(
    parent_datablock, datablock, exclude_categories=None
):
    """Merge datablock into parent_datablock ignoring exclude_categories."""
    for category in datablock.getObjNameList():
        if category in exclude_categories:
            continue
        db_ctgry = datablock.getObj(category)
        # check if the data category exists in parent
        if parent_datablock.exists(category):
            p_ctgry = parent_datablock.getObj(category)
            # compare items
            not_in_p, in_both, _ = db_ctgry.cmpAttributeNames(p_ctgry)
            _add_or_extend_rows(db_ctgry, p_ctgry, in_both, not_in_p)
        else:
            # data category does not exist in parent, append it to datablock
            parent_datablock.append(db_ctgry)


def _try_os_remove(path):
    """Try to remove a file, don't complain if that fails."""
    try:
        os.remove(path)
    except:  # pylint: disable=bare-except
        pass


def _merge_cif_data(model_cif_data, assoc_cif, row_entry_id, entry_id_map):
    """Merge contents of an associated file into cif data."""
    error_msgs = {"cifcheck-errors": []}

    # per datablock, check to which datablock it belongs in the parent cif
    for assoc_cntnr in assoc_cif:
        # check/ get 'entry_link'
        assoc_entry_link = assoc_cntnr.getObj("entry_link")
        if assoc_entry_link is None:
            error_msgs["cifcheck-errors"].append(
                'ERROR - category "entry_link" is mandatory, but it is not '
                + f'present in datablock "{assoc_cntnr.getName()}"'
            )
            continue
        # make sure entry_id exists for entry_link
        entry_id_idx = assoc_entry_link.getAttributeIndex("entry_id")
        if entry_id_idx == -1:
            error_msgs["cifcheck-errors"].append(
                f'ERROR - In block "{assoc_cntnr.getName()}", mandatory item '
                + '"entry_id" is not in category "entry_link"'
            )
            continue
        # For each entry_id, look up the corresponding datablock in
        # model_cif_data and merge with that datablock.
        for row in assoc_entry_link:
            entry_id = row[entry_id_idx]
            if entry_id != row_entry_id:
                error_msgs["cifcheck-errors"].append(
                    f'ERROR - In block "{assoc_cntnr.getName()}", item '
                    + '"entry_id" does not match item '
                    + '"ma_entry_associated_files.entry_id"'
                )
                continue
            _merge_cif_datacontainer(
                model_cif_data[entry_id_map[entry_id]],
                assoc_cntnr,
                exclude_categories=["entry_link"],
            )

    return error_msgs


class _CifCheck:
    """Handling the CifCheck tool."""

    def __init__(self, dict_sdb, json_out_file=None, verbose=False):
        self._version = None
        self.check_results = {"errors": [], "diagnosis": [], "cifcheck-errors": []}
        self.dict_sdb = os.path.abspath(dict_sdb)
        self.json_out_file = json_out_file
        self.verbose = verbose

    @property
    def version(self):
        """Get version dictionary if available"""
        if self._version is not None:
            return self._version
        vrsn_file = os.path.splitext(self.dict_sdb)[0] + "_version.json"
        try:
            with open(vrsn_file, "r", encoding="utf-8") as jfh:
                vrsn = json.load(jfh)
        except FileNotFoundError:
            self._version = {"version": [{"title": None, "version": None}]}
            self.add_general_error(
                f"Version JSON file not found at '{vrsn_file}'"
            )
        else:
            self._version = vrsn

        return self._version

    def add_general_error(self, msg):
        """Add a uncategorised error to the list."""
        self.check_results["errors"].append(msg)

    def _execute(self, filepath):
        """Execute the CifCheck tool on a model mmCIF file."""
        # If permission errors occur with the source directory of the CIF file,
        # consider copying the file to a Python tempfile generated path. That
        # deals with missing $TMP, $TEMP, etc.... variables.
        # At the moment, cwd is switched to the source directory since CifCheck
        # copies the file, otherwise.
        cifcheck_filepath = os.path.basename(filepath)
        cifcheck_cmd = [
            "CifCheck",
            "-dictSdb",
            self.dict_sdb,
            "-f",
            cifcheck_filepath,
        ]
        cifcheck_wd = os.path.dirname(os.path.abspath(filepath))
        cps = subprocess.run(
            cifcheck_cmd,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=False,
            universal_newlines=True,
            cwd=cifcheck_wd,
        )

        error_lst = []
        # get error messages on the command line
        error_lst.extend(_parse_cifcheck_stderr(cps.stderr))
        error_lst.extend(_parse_parser_file(filepath))
        if len(error_lst) > 0:
            raise _CifCheckFailedError(cifcheck_cmd, error_lst)

        # get messages from diagnosis file
        error_lst.extend(_parse_diag_file(filepath))

        return error_lst

    def run(self, cif_file):
        """Run CifCheck for a given file and catch the output.

        Returns False if the CifCheck execution itself failed."""
        try:
            format_errors = self._execute(cif_file)
        except _CifCheckFailedError as exc:
            if self.verbose:
                _warn("failed to run CifCheck, Stopping.")
                for line in exc.args:
                    print(line, file=sys.stderr)
                print("CifCheck errors:", file=sys.stderr)
                for line in exc.cifcheck_errors:
                    print("  ", line, file=sys.stderr)

            self.check_results["status"] = "aborted"
            self.check_results["cifcheck-command"] = " ".join(exc.cifcheck_cmd)
            self.check_results["cifcheck-errors"] = exc.cifcheck_errors

            return False

        self.check_results["status"] = "completed"
        self.check_results["diagnosis"] = format_errors

        return True

    def got_issues(self):
        """Query if there are parser or diagnosis messages."""
        if (
            "diagnosis" in self.check_results
            and len(self.check_results["diagnosis"]) > 0
        ):
            return True

        if (
            "cifcheck-errors" in self.check_results
            and len(self.check_results["cifcheck-errors"]) > 0
        ):
            return True

        return False

    def _update_audit_conform(self, ac_cat):
        """Update an existing audit_conform category entry."""
        # check if name is there, if not, append
        nm_idx = ac_cat.getAttributeIndex("dict_name")
        vs_idx = ac_cat.getAttributeIndex("dict_version")
        lc_idx = ac_cat.getAttributeIndex("dict_location")
        for dct in self.version["versions"]:
            found = False
            for itm in ac_cat:
                if dct["title"] == itm[nm_idx]:
                    itm[vs_idx] = dct["version"]
                    itm[lc_idx] = dct["location"]
                    found = True
                    break
            if not found:
                new_ac = [""] * 3
                new_ac[nm_idx] = dct["title"]
                new_ac[vs_idx] = dct["version"]
                new_ac[lc_idx] = dct["location"]
                ac_cat.append(new_ac)

    def _add_audit_conform(self, pdbx_cntnr, mdl_cif, container_idx):
        """Add audit_conform category entry to data container."""
        ac_cat = DataCategory(
            "audit_conform",
            ["dict_name", "dict_version", "dict_location"],
            [
                [x["title"], x["version"], x["location"]]
                for x in self.version["versions"]
            ],
        )
        # We want nicely formatted cif files, so place audit_conform
        # after entry.
        objs = pdbx_cntnr.getObjCatalog()
        names = list(objs.keys())
        pdbx_cntnr = DataContainer(pdbx_cntnr.getName())
        found = False
        while len(names) > 0:
            nme = names.pop(0)
            pdbx_cntnr.append(objs[nme])
            if nme == "entry":
                pdbx_cntnr.append(ac_cat)
                found = True
                break
        for nme in names:
            pdbx_cntnr.append(objs[nme])
        if not found:
            pdbx_cntnr.append(ac_cat)
        mdl_cif[container_idx] = pdbx_cntnr

    def add_versions_to_mmcif_file(self, mdl_cif, dest_file):
        """Add versions of mmCIF dictionaries to a mmCIF file.

        :param mdl_cif: CIF data to be equipped with version data.
        :type mdl_cif: :class:`list` of DataContainer
        :param dest_file: Path to write the modified file to.
        :type dest_file: :class:`str`
        """
        # add/ modify audit_conform category
        for i, pdbx_cntnr in enumerate(mdl_cif):
            ac_cat = pdbx_cntnr.getObj("audit_conform")
            if ac_cat is not None:
                self._update_audit_conform(ac_cat)
            else:
                self._add_audit_conform(pdbx_cntnr, mdl_cif, i)

        # write modified mmCIF containers to file
        _write_mmcif(dest_file, mdl_cif)

        return mdl_cif

    def to_json(self):
        """Get CifCheck results as JSON."""
        self.check_results.update(self.version)
        return json.dumps(self.check_results)

    def make_json_output(self):
        """Dump JSON results of CifCheck either as file or print to stdout."""
        if self.verbose:
            print("=============== CifCheck Errors ==============")
            if "cifcheck-errors" in self.check_results:
                for line in self.check_results["cifcheck-errors"]:
                    print(line)
            sys.stdout.write("\n")
            print("============= CifCheck Diagnosis =============")
            if "diagnosis" in self.check_results:
                for line in self.check_results["diagnosis"]:
                    print(line)
            sys.stdout.write("\n")

        json_data = self.to_json()
        if self.verbose or not self.json_out_file:
            print(json_data)

        if self.json_out_file is not None:
            with open(self.json_out_file, "w", encoding="utf-8") as jfh:
                jfh.write(json_data)

    def add_to_results(self, msgs):
        """Add messages to the CifCheck results"""
        if "cifcheck-errors" not in self.check_results:
            self.check_results["cifcheck-errors"] = msgs["cifcheck-errors"]
        else:
            self.check_results["cifcheck-errors"].extend(
                msgs["cifcheck-errors"]
            )

    def make_report(self):
        """Make a concise report out of the results.

        Be aware, that cuts away the majority of the messages. But solving those
        issues first, may already repair a mmCIF file."""
        not_implemented = ["errors"]
        for category in not_implemented:
            if len(self.check_results[category]) > 0:
                raise NotImplementedError(
                    f"Results for category '{category}' not yet supported in "
                    + "report."
                )
        print("Report")
        print("======")
        print(f"Status of check: {self.check_results['status']}")
        if "versions" in self.check_results:
            print("CIF dictionaries used:")
            for dct in self.check_results["versions"]:
                print(f"   {dct['title']}/ {dct['version']}")
                print(f"   {dct['location']}")

        # condense diagnosis data
        rprt = {
            "missing_cats": set(),
            "missing_itms": set(),
            "parchild_mm": set(),
        }
        for line in self.check_results["diagnosis"]:
            # missing categories
            for pttrn in [
                r"^ERROR - category \"(?P<cat>.*)\" is mandatory, but it is "
                + r"not present in datablock \"(?P<dblock>.*)\"$",
                r"^\+\+ERROR - In block \"(?P<dblock>.*)\", parent category "
                + r"\"(?P<cat>.*)\", of category \".*\", is missing\.$",
            ]:
                match = re.match(pttrn, line)
                if match is not None:
                    rprt["missing_cats"].add(match.group("cat"))
                    _check_dblock_name(match.group("dblock"), rprt)
                    break
            if match is not None:
                continue
            # missing items
            for pttrn in [
                r"^ERROR - In block \"(?P<dblock>.*)\", mandatory "
                + r"item \"(?P<itm>.*)\" is not in category \"(?P<cat>.*)\"$"
            ]:
                match = re.match(pttrn, line)
                if match is not None:
                    rprt["missing_itms"].add(
                        f"{match.group('cat')}.{match.group('itm')}"
                    )
                    _check_dblock_name(match.group("dblock"), rprt)
                    break
            if match is not None:
                continue
            # parent-child issues
            match = re.match(
                r"^ERROR PARCHILD \".*\" - In block \"(?P<dblock>.*)\", in "
                + r"category \"(?P<chld>.*)\", in row\# \d+, unmatched value "
                + r"in the parent \"(?P<prnt>.*)\"$",
                line,
            )
            if match is not None:
                rprt["parchild_mm"].add(
                    f"{match.group('chld')}->{match.group('prnt')}"
                )
                _check_dblock_name(match.group("dblock"), rprt)
                continue
            match = re.match(
                r"^\"(?P<chld>.*)\" -> \"(?P<prnt>.*)\" value =(?P<vle>.*)$",
                line,
            )
            if match is not None:
                # prepare a string to be removed from parchild_mm
                chld = match.group("chld").split(".")[0][1:]
                prnt = match.group("prnt").split(".")[0][1:]
                try:
                    rprt["parchild_mm"].remove(f"{chld}->{prnt}")
                except KeyError:
                    pass
                # add a more verbose line instead
                rprt["parchild_mm"].add(
                    f"{match.group('chld')}->{match.group('prnt')}, "
                    + f"value={match.group('vle')}"
                )
                continue
            # Unmatched lines need to be added to above evaluation
            raise RuntimeError(
                f'Unmatched diagnosis line found:\n"""{line}"""'
            )

        # print above evaluation in the report
        # datablock
        print("Diagnosis:")
        if "datablock" in rprt:
            print("   Datablock/ entry name:", rprt["datablock"])
        if len(rprt["missing_cats"]) > 0:
            print("   Missing categories:")
            for line in sorted(rprt["missing_cats"]):
                print(f"      {line}")
        if len(rprt["missing_itms"]) > 0:
            print("   Missing items:")
            for line in sorted(rprt["missing_itms"]):
                print(f"      {line}")
        if len(rprt["parchild_mm"]) > 0:
            print("   Mismatching parent/ child relationships:")
            for line in sorted(rprt["parchild_mm"]):
                print(f"      {line}")

        # print erros/ messages caught
        print("Errors by running CifCheck:")
        for line in self.check_results["cifcheck-errors"]:
            print(f"   {line}")


def _check_dblock_name(name, report):
    """Compare datablock names."""
    try:
        # pylint: disable=used-before-assignment
        if report["datablock"] != name:
            raise RuntimeError(
                "Two different datablock (names) found: "
                + f"{report['datablock']} vs {name}"
            )
    except KeyError:
        report["datablock"] = name


def _find_utf(line):
    """Try to find a word with an UTF character in a string."""
    for i, ltr in enumerate(line):
        try:
            ltr.encode("ascii", "strict")
        except UnicodeEncodeError:
            return i

    return None


def _file_has_utf(filename, cifcheck):
    """Check a file to not contain UTF characters as mmCIF only allows ASCII."""
    with open(filename, encoding="utf-8") as cfh:
        for i, line in enumerate(cfh):
            try:
                line.encode("ascii", "strict")
            except UnicodeEncodeError:
                idx = _find_utf(line)
                cifcheck.add_general_error(
                    "File is non-ascii as it has an UTF character in line "
                    + f"{i}, index {idx}."
                )
                return True

    return False


def _main():
    """Run as script"""
    # ToDo: for MA-pedantic check: use "_ma_target_ref_db_details" instead of
    #       "_struct_ref"
    opts = _parse_command_line()

    # set up the CifCheck tool
    cifcheck = _CifCheck(opts.dict_sdb, opts.out_file, opts.verbose)

    if _file_has_utf(opts.model_cif, cifcheck):
        cifcheck.make_json_output()
        sys.exit(1)

    # check for associated files referenced by the model cif file
    assoc_files, model_cif_data, entry_id_map = _get_associated_files(
        opts.model_cif,
        opts.associates_dir,
        cifcheck,
    )
    # save original data for later
    if opts.extend_validated_file is not None:
        o_model_cif_data = copy.deepcopy(model_cif_data)
    # make sure associated files exist and merge all of them into the model
    for assoc, entry_id in assoc_files:
        # merge the model.cif and the associated file
        msgs = _merge_cif_data(model_cif_data, assoc, entry_id, entry_id_map)
        cifcheck.add_to_results(msgs)

    validate_file = opts.model_cif
    if assoc_files:
        # write merged data to disk, create tmp file, clean up when done
        cfh, cfn = tempfile.mkstemp(suffix=".cif", text=True)
        # register for deletion here and in cwd
        atexit.register(_try_os_remove, cfn)
        os.close(cfh)
        _write_mmcif(cfn, model_cif_data)
        validate_file = cfn

    # validate file
    success = cifcheck.run(validate_file)
    if not success:
        if opts.report:
            cifcheck.make_report()
        cifcheck.make_json_output()
        sys.exit(1)

    # upon request (-e) extend the ORIGINAL file (not the merged one)
    if not cifcheck.got_issues() and opts.extend_validated_file is not None:
        cifcheck.add_versions_to_mmcif_file(
            o_model_cif_data, opts.extend_validated_file
        )

    # return JSON as file or to stdout
    if opts.out_file and opts.verbose:
        print(f"Writing results of CifCheck to '{opts.out_file}'")

    # print a report to stdout
    if opts.report:
        cifcheck.make_report()

    cifcheck.make_json_output()

    if cifcheck.got_issues():
        # If CifCheck found issues with the mmCIF file, exit with code 2. Exit
        # code 1 is reserved for general issues running the command, like "file
        # not found".
        sys.exit(2)


if __name__ == "__main__":
    _main()

#  LocalWords:  cif MSA FASTA pylint stdout CifCheck param src str dest cwd