diff --git a/validation/.spelling b/validation/.spelling new file mode 100644 index 0000000000000000000000000000000000000000..fecbba386b657adaafab938a7e299310259ce95f --- /dev/null +++ b/validation/.spelling @@ -0,0 +1,4 @@ +CIF +UTF +stdout +uncategorised diff --git a/validation/validate-mmcif-file.py b/validation/validate-mmcif-file.py index b76bea0b2938c05e219a94a9b2298d3467e7c124..8d05e56299b2f4d10b2e1830c4108e47faebe8cd 100755 --- a/validation/validate-mmcif-file.py +++ b/validation/validate-mmcif-file.py @@ -2,7 +2,7 @@ """Validate mmCIF format in a model mmCIF file. Does not check if the model/ coordinates make sense. But includes associated -cif files in the check by merging files. That is, as an example, associated +CIF files in the check by merging files. That is, as an example, associated files with quality scores stored in mmCIF format will be merged with the model file and checked, but associated MSA files in FASTA format can not be merged and thus, won't be merged into the model mmCIF file and won't be checked. @@ -46,7 +46,7 @@ def _parse_command_line(): "model_cif", type=str, metavar="<MODEL MMCIF FILE>", - help="Path to the model mmCIF file. This is the 'main' cif file of a " + help="Path to the model mmCIF file. This is the 'main' CIF file of a " + "modelling project including coordinates.", ) parser.add_argument( @@ -120,7 +120,7 @@ def _error(msg): def _warn(msg): - """Pritn a warning message.""" + """Print a warning message.""" print(f"WARNING: {msg}", file=sys.stderr) @@ -223,7 +223,7 @@ def _get_indeces(data_category, attribute_list): def _get_entry_id(cif_datablock, entry_id_map, datablock_idx): - """Get a mapping of the entry.id from a cif datablock.""" + """Get a mapping of the entry.id from a CIF data block.""" entry = cif_datablock.getObj("entry") if entry is not None: eidx = entry.getAttributeIndex("id") @@ -277,7 +277,7 @@ def _get_arc_zipfile_handle(arc_file, assoc_dir): def _unzip_arc_cif(arc_zip, cif_file): - """Extract a cif file from a ZIP archive.""" + """Extract a CIF file from a ZIP archive.""" assoc_data = [] with TextIOWrapper(arc_zip.open(cif_file), encoding="utf-8") as cif_fh: assoc_data = _read_mmcif(cif_fh) @@ -286,7 +286,7 @@ def _unzip_arc_cif(arc_zip, cif_file): def _get_associated_files(model_cif_file, assoc_dir, cifcheck): - """Get the list of associated files from a model cif file.""" + """Get the list of associated files from a model CIF file.""" # This is an intermediate step, so we do not need to check/ report anything # here. The actual confirmation comes out of CifCheck at a later stage. entry_id_map = {} @@ -332,6 +332,7 @@ def _get_associated_files(model_cif_file, assoc_dir, cifcheck): dat_cat, ["archive_file_id", "file_content", "file_format", "file_path"], ) + # get associated files/ data that can be added to the CIF content last_arc_id = "" arc_zip = None for row in dat_cat: @@ -351,7 +352,7 @@ def _get_associated_files(model_cif_file, assoc_dir, cifcheck): cif_file = row[idxs["file_path"]] data = _unzip_arc_cif(arc_zip, cif_file) assoc_files.append((data, archives[arc_id][1])) - elif row[idxs["file_content"]] not in ["other"]: + elif row[idxs["file_content"]] != "other": raise RuntimeError( "Unknown associated CIF file content " + f"found: {row[idxs['file_content']]}" @@ -369,8 +370,12 @@ def _get_associated_files(model_cif_file, assoc_dir, cifcheck): return assoc_files, mdl_cif, entry_id_map +# ToDo: def _get_assoc_data(): +# """Extract data to be appended to the main CIF file from associated archives.""" + + def _cmp_cif_rows(a_row, b_row, a_idxs, b_idxs, attrs): - """Compare two cif rows by given attributes""" + """Compare two CIF rows by given attributes""" for i in attrs: if a_row[a_idxs[i]] != b_row[b_idxs[i]]: return False @@ -379,7 +384,7 @@ def _cmp_cif_rows(a_row, b_row, a_idxs, b_idxs, attrs): def _add_row(row, src_idxs, dest_idxs, dest, attrs_l): - """Add a data row to an existing datablock with the right item order.""" + """Add a data row to an existing data block with the right item order.""" # create a new row fitting dest's order new_row = list("?" * attrs_l) for i, j in src_idxs.items(): @@ -388,7 +393,7 @@ def _add_row(row, src_idxs, dest_idxs, dest, attrs_l): def _add_or_extend_rows(src, dest, common, not_in_dest): - """Mix/ add rows from src into dest.""" + """Mix/ add rows from `src` into `dest`.""" # extend dest with new attributes for attr in not_in_dest: dest.appendAttribute(attr) @@ -419,7 +424,7 @@ def _add_or_extend_rows(src, dest, common, not_in_dest): def _merge_cif_datacontainer( parent_datablock, datablock, exclude_categories=None ): - """Merge datablock into parent_datablock ignoring exclude_categories.""" + """Merge data block into parent_datablock ignoring exclude_categories.""" for category in datablock.getObjNameList(): if category in exclude_categories: continue @@ -444,7 +449,7 @@ def _try_os_remove(path): def _merge_cif_data(model_cif_data, assoc_cif, row_entry_id, entry_id_map): - """Merge contents of an associated file into cif data.""" + """Merge contents of an associated file into CIF data.""" error_msgs = {"cifcheck-errors": []} # per datablock, check to which datablock it belongs in the parent cif @@ -485,6 +490,40 @@ def _merge_cif_data(model_cif_data, assoc_cif, row_entry_id, entry_id_map): return error_msgs +def _print_report(header, msgs, level=0): + """Print a message dictionary - report style.""" + # check if there are any messages, otherwise skip + found_msgs = False + for lines in msgs.values(): + if len(lines) > 0: + found_msgs = True + break + if not found_msgs: + return + + lws = " " * level # leading whitespaces + if level == 0: + print(f"{lws}{header}") + else: + print(f"{lws}{header}:") + for sctn, lines in msgs.items(): + if len(lines) == 0: + continue + if isinstance(lines, (list, set)): + if len(lines) == 1: + print(f"{lws} {sctn}: {lines.pop()}") + continue + print(f"{lws} {sctn}:") + for line in lines: + print(f"{lws} {line}") + elif isinstance(lines, dict): + _print_report(sctn, lines, level=level + 1) + else: + raise NotImplementedError( + f"Unsupported type {type(lines)} found " + "for reporting." + ) + + class _CifCheck: """Handling the CifCheck tool.""" @@ -708,25 +747,13 @@ class _CifCheck: msgs["cifcheck-errors"] ) - def make_report(self): - """Make a concise report out of the results. - - Be aware, that cuts away the majority of the messages. But solving those - issues first, may already repair a mmCIF file.""" - print("Report") - print("======") - print(f"Status of check: {self.check_results['status']}") - if "versions" in self.check_results: - print("CIF dictionaries used:") - for dct in self.check_results["versions"]: - print(f" {dct['title']}/ {dct['version']}") - print(f" {dct['location']}") - - # condense diagnosis data + def _condense_diagnosis_data(self): + """Make the concise report bit for the "diagnosis" results.""" rprt = { - "missing_cats": set(), - "missing_itms": set(), - "parchild_mm": set(), + "Datablock/ entry name": [], + "Missing categories": set(), + "Missing items": set(), + "Mismatching parent/ child relationships": set(), } for line in self.check_results["diagnosis"]: # missing categories @@ -738,7 +765,7 @@ class _CifCheck: ]: match = re.match(pttrn, line) if match is not None: - rprt["missing_cats"].add(match.group("cat")) + rprt["Missing categories"].add(match.group("cat")) _check_dblock_name(match.group("dblock"), rprt) break if match is not None: @@ -750,7 +777,7 @@ class _CifCheck: ]: match = re.match(pttrn, line) if match is not None: - rprt["missing_itms"].add( + rprt["Missing items"].add( f"{match.group('cat')}.{match.group('itm')}" ) _check_dblock_name(match.group("dblock"), rprt) @@ -765,7 +792,7 @@ class _CifCheck: line, ) if match is not None: - rprt["parchild_mm"].add( + rprt["Mismatching parent/ child relationships"].add( f"{match.group('chld')}->{match.group('prnt')}" ) _check_dblock_name(match.group("dblock"), rprt) @@ -775,15 +802,17 @@ class _CifCheck: line, ) if match is not None: - # prepare a string to be removed from parchild_mm + # prepare a string to be removed from Mismatching parent/ child relationships chld = match.group("chld").split(".")[0][1:] prnt = match.group("prnt").split(".")[0][1:] try: - rprt["parchild_mm"].remove(f"{chld}->{prnt}") + rprt["Mismatching parent/ child relationships"].remove( + f"{chld}->{prnt}" + ) except KeyError: pass # add a more verbose line instead - rprt["parchild_mm"].add( + rprt["Mismatching parent/ child relationships"].add( f"{match.group('chld')}->{match.group('prnt')}, " + f"value={match.group('vle')}" ) @@ -794,26 +823,12 @@ class _CifCheck: ) # print above evaluation in the report - # datablock - print("Diagnosis:") - if "datablock" in rprt: - print(" Datablock/ entry name:", rprt["datablock"]) - if len(rprt["missing_cats"]) > 0: - print(" Missing categories:") - for line in sorted(rprt["missing_cats"]): - print(f" {line}") - if len(rprt["missing_itms"]) > 0: - print(" Missing items:") - for line in sorted(rprt["missing_itms"]): - print(f" {line}") - if len(rprt["parchild_mm"]) > 0: - print(" Mismatching parent/ child relationships:") - for line in sorted(rprt["parchild_mm"]): - print(f" {line}") - - # condense 'other' errors + _print_report("Diagnosis:", rprt) + + def _condense_other_errors(self): + """Gather errors not covered by diagnosis.""" rprt = { - "missing_files": {}, + "Missing (archive) files": {}, } for line in self.check_results["errors"]: match = re.match( @@ -823,11 +838,11 @@ class _CifCheck: ) if match is not None: try: - rprt["missing_files"][match.group("arc")].append( + rprt["Missing (archive) files"][match.group("arc")].append( match.group("fle") ) except KeyError: - rprt["missing_files"][match.group("arc")] = [ + rprt["Missing (archive) files"][match.group("arc")] = [ match.group("fle") ] continue @@ -835,31 +850,46 @@ class _CifCheck: raise RuntimeError(f'Unmatched error line found:\n"""{line}"""') # print above evaluation in the report - print("Other issues:") - if len(rprt["missing_files"]) > 0: - print(" Missing (archive) )files:") - for arc, fles in rprt["missing_files"].items(): - print(f" {arc}:") - for line in fles: - print(f" {line}") + _print_report("Other issues:", rprt) + + def make_report(self): + """Make a concise report out of the results. + + Be aware, that cuts away the majority of the messages. But solving those + issues first, may already repair a mmCIF file.""" + print( + "Report\n======\nStatus of check: " + + f"{self.check_results['status']}" + ) + self.to_json() # get some extra data created for the JSON dump + if "versions" in self.check_results: + print("CIF dictionaries used:") + for dct in self.check_results["versions"]: + print( + f" {dct['title']}/ {dct['version']}\n" + + f" {dct['location']}" + ) + self._condense_diagnosis_data() + self._condense_other_errors() # print erros/ messages caught - print("Errors by running CifCheck:") - for line in self.check_results["cifcheck-errors"]: - print(f" {line}") + if len(self.check_results["cifcheck-errors"]) > 0: + print("Errors by running CifCheck:") + for line in self.check_results["cifcheck-errors"]: + print(f" {line}") def _check_dblock_name(name, report): - """Compare datablock names.""" + """Compare data block names.""" try: # pylint: disable=used-before-assignment - if report["datablock"] != name: + if report["Datablock/ entry name"][0] != name: raise RuntimeError( "Two different datablock (names) found: " - + f"{report['datablock']} vs {name}" + + f"{report['Datablock/ entry name'][0]} vs {name}" ) - except KeyError: - report["datablock"] = name + except IndexError: + report["Datablock/ entry name"] = [name] def _find_utf(line): @@ -933,7 +963,8 @@ def _main(): if not success: if opts.report: cifcheck.make_report() - cifcheck.make_json_output() + else: + cifcheck.make_json_output() sys.exit(1) # upon request (-e) extend the ORIGINAL file (not the merged one) @@ -949,8 +980,8 @@ def _main(): # print a report to stdout if opts.report: cifcheck.make_report() - - cifcheck.make_json_output() + else: + cifcheck.make_json_output() if cifcheck.got_issues(): # If CifCheck found issues with the mmCIF file, exit with code 2. Exit