From 81dfbb2b39f763d769a1af27ec5b515e18d8d12b Mon Sep 17 00:00:00 2001 From: B13nch3n <b13nch3n_01@theb-si.de> Date: Thu, 18 Aug 2022 14:47:36 +0200 Subject: [PATCH] Read associated Zip archives. --- validation/validate-mmcif-file.py | 95 ++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 34 deletions(-) diff --git a/validation/validate-mmcif-file.py b/validation/validate-mmcif-file.py index dff916f..fbea4b7 100755 --- a/validation/validate-mmcif-file.py +++ b/validation/validate-mmcif-file.py @@ -10,6 +10,7 @@ and thus, won't be merged into the model mmCIF file and won't be checked. # pylint: disable=invalid-name # pylint: enable=invalid-name +from io import TextIOWrapper import argparse import atexit import copy @@ -17,6 +18,7 @@ import os import subprocess import sys import tempfile +import zipfile import rapidjson as json @@ -169,11 +171,15 @@ class _CifCheckFailedError(RuntimeError): self.cifcheck_cmd = cifcheck_cmd -def _read_mmcif(filepath): +def _read_mmcif(filepath_or_object): """Read a mmCIF file""" data_lst = [] - with open(filepath, encoding="utf-8") as ifh: - prd = PdbxReader(ifh) + if isinstance(filepath_or_object, str): + with open(filepath_or_object, encoding="utf-8") as ifh: + prd = PdbxReader(ifh) + prd.read(data_lst) + else: + prd = PdbxReader(filepath_or_object) prd.read(data_lst) return data_lst @@ -188,6 +194,17 @@ def _write_mmcif(filepath, cif_data): cifwriter.write(cif_data) +def _get_indeces(data_category, attribute_list): + """Get column indexes for a list of attributes.""" + idxs = {} + for attr in attribute_list: + idxs[attr] = data_category.getAttributeIndex(attr) + if idxs[attr] == -1: + return {} + + return idxs + + def _get_entry_id(cif_datablock, entry_id_map, datablock_idx): """Get a mapping of the entry.id from a cif datablock.""" entry = cif_datablock.getObj("entry") @@ -198,7 +215,17 @@ def _get_entry_id(cif_datablock, entry_id_map, datablock_idx): entry_id_map[row[eidx]] = datablock_idx -def _get_associated_files(model_cif_file): +def _unzip_arc_cif(arc_file, cif_file, assoc_dir): + """Extract a cif file from a ZIP archive.""" + assoc_data = [] + with zipfile.ZipFile(os.path.join(assoc_dir, arc_file)) as arc_zip: + with TextIOWrapper(arc_zip.open(cif_file), encoding="utf-8") as cif_fh: + assoc_data = _read_mmcif(cif_fh) + + return assoc_data + + +def _get_associated_files(model_cif_file, assoc_dir): """Get the list of associated files from a model cif file.""" # This is an intermediate step, so we do not need to check/ report anything # here. The actual confirmation comes out of CifCheck at a later stage. @@ -206,40 +233,45 @@ def _get_associated_files(model_cif_file): entry_id_map = {} assoc_files = [] - idxs = {} + archives = {} for i, pdbx_cntnr in enumerate(mdl_cif): # gather entry.id's for later _get_entry_id(pdbx_cntnr, entry_id_map, i) - meafs = pdbx_cntnr.getObj("ma_entry_associated_files") + dat_cat = pdbx_cntnr.getObj("ma_entry_associated_files") # If ma_entry_associated_files is not present then # ma_associated_archive_file_details can't exist either since it has a # ma_entry_associated_files.id relation. (CifCheck should notice that.) - if meafs is None: + if dat_cat is None: continue - not_found = False - for j in ["file_format", "file_url", "entry_id"]: - idxs[j] = meafs.getAttributeIndex(j) - if idxs[j] == -1: - not_found = True - break - if not_found: + idxs = _get_indeces( + dat_cat, ["entry_id", "file_format", "file_type", "file_url", "id"] + ) + if not idxs: continue - for row in meafs: + for row in dat_cat: + if row[idxs["file_type"]] == "archive": + archives[row[idxs["id"]]] = ( + row[idxs["file_url"]], + row[idxs["entry_id"]], + ) if row[idxs["file_format"]] != "cif": continue - assoc_files.append((row[idxs["file_url"]], row[idxs["entry_id"]])) + data = _read_mmcif(os.path.join(assoc_dir, row[idxs["file_url"]])) + assoc_files.append((data, row[idxs["entry_id"]])) # make sure entry_id is matching in associated file! - maafd = pdbx_cntnr.getObj("ma_associated_archive_file_details") - if maafd is None: - continue - idxs["file_format"] = maafd.getAttributeIndex("file_format") - if idxs["file_format"] == -1: + dat_cat = pdbx_cntnr.getObj("ma_associated_archive_file_details") + if dat_cat is None: continue - for row in maafd: + idxs = _get_indeces( + dat_cat, ["archive_file_id", "file_format", "file_path"] + ) + for row in dat_cat: if row[idxs["file_format"]] == "cif": - raise NotImplementedError( - "Fetching associated cif files from archive." - ) + arc_id = row[idxs["archive_file_id"]] + arc_file = archives[arc_id][0] + cif_file = row[idxs["file_path"]] + data = _unzip_arc_cif(arc_file, cif_file, assoc_dir) + assoc_files.append((data, archives[arc_id][1])) return assoc_files, mdl_cif, entry_id_map @@ -318,12 +350,9 @@ def _try_os_remove(path): pass -def _merge_cif_data( - model_cif_data, associated_path, row_entry_id, entry_id_map -): +def _merge_cif_data(model_cif_data, assoc_cif, row_entry_id, entry_id_map): """Merge contents of an associated file into cif data.""" error_msgs = {"cifcheck-errors": []} - assoc_cif = _read_mmcif(associated_path) # per datablock, check to which datablock it belongs in the parent cif for assoc_cntnr in assoc_cif: @@ -627,18 +656,16 @@ def _main(): # check for associated files referenced by the model cif file assoc_files, model_cif_data, entry_id_map = _get_associated_files( - opts.model_cif + opts.model_cif, + opts.associates_dir, ) # save original data for later if opts.extend_validated_file is not None: o_model_cif_data = copy.deepcopy(model_cif_data) # make sure associated files exist and merge all of them into the model for assoc, entry_id in assoc_files: - assoc_path = os.path.join(opts.associates_dir, assoc) # merge the model.cif and the associated file - msgs = _merge_cif_data( - model_cif_data, assoc_path, entry_id, entry_id_map - ) + msgs = _merge_cif_data(model_cif_data, assoc, entry_id, entry_id_map) cifcheck.add_to_results(msgs) validate_file = opts.model_cif -- GitLab