Skip to content
Snippets Groups Projects
Commit 81dfbb2b authored by B13nch3n's avatar B13nch3n
Browse files

Read associated Zip archives.

parent 0a3c7dc4
Branches
No related tags found
No related merge requests found
......@@ -10,6 +10,7 @@ and thus, won't be merged into the model mmCIF file and won't be checked.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from io import TextIOWrapper
import argparse
import atexit
import copy
......@@ -17,6 +18,7 @@ import os
import subprocess
import sys
import tempfile
import zipfile
import rapidjson as json
......@@ -169,11 +171,15 @@ class _CifCheckFailedError(RuntimeError):
self.cifcheck_cmd = cifcheck_cmd
def _read_mmcif(filepath):
def _read_mmcif(filepath_or_object):
"""Read a mmCIF file"""
data_lst = []
with open(filepath, encoding="utf-8") as ifh:
prd = PdbxReader(ifh)
if isinstance(filepath_or_object, str):
with open(filepath_or_object, encoding="utf-8") as ifh:
prd = PdbxReader(ifh)
prd.read(data_lst)
else:
prd = PdbxReader(filepath_or_object)
prd.read(data_lst)
return data_lst
......@@ -188,6 +194,17 @@ def _write_mmcif(filepath, cif_data):
cifwriter.write(cif_data)
def _get_indeces(data_category, attribute_list):
"""Get column indexes for a list of attributes."""
idxs = {}
for attr in attribute_list:
idxs[attr] = data_category.getAttributeIndex(attr)
if idxs[attr] == -1:
return {}
return idxs
def _get_entry_id(cif_datablock, entry_id_map, datablock_idx):
"""Get a mapping of the entry.id from a cif datablock."""
entry = cif_datablock.getObj("entry")
......@@ -198,7 +215,17 @@ def _get_entry_id(cif_datablock, entry_id_map, datablock_idx):
entry_id_map[row[eidx]] = datablock_idx
def _get_associated_files(model_cif_file):
def _unzip_arc_cif(arc_file, cif_file, assoc_dir):
"""Extract a cif file from a ZIP archive."""
assoc_data = []
with zipfile.ZipFile(os.path.join(assoc_dir, arc_file)) as arc_zip:
with TextIOWrapper(arc_zip.open(cif_file), encoding="utf-8") as cif_fh:
assoc_data = _read_mmcif(cif_fh)
return assoc_data
def _get_associated_files(model_cif_file, assoc_dir):
"""Get the list of associated files from a model cif file."""
# This is an intermediate step, so we do not need to check/ report anything
# here. The actual confirmation comes out of CifCheck at a later stage.
......@@ -206,40 +233,45 @@ def _get_associated_files(model_cif_file):
entry_id_map = {}
assoc_files = []
idxs = {}
archives = {}
for i, pdbx_cntnr in enumerate(mdl_cif):
# gather entry.id's for later
_get_entry_id(pdbx_cntnr, entry_id_map, i)
meafs = pdbx_cntnr.getObj("ma_entry_associated_files")
dat_cat = pdbx_cntnr.getObj("ma_entry_associated_files")
# If ma_entry_associated_files is not present then
# ma_associated_archive_file_details can't exist either since it has a
# ma_entry_associated_files.id relation. (CifCheck should notice that.)
if meafs is None:
if dat_cat is None:
continue
not_found = False
for j in ["file_format", "file_url", "entry_id"]:
idxs[j] = meafs.getAttributeIndex(j)
if idxs[j] == -1:
not_found = True
break
if not_found:
idxs = _get_indeces(
dat_cat, ["entry_id", "file_format", "file_type", "file_url", "id"]
)
if not idxs:
continue
for row in meafs:
for row in dat_cat:
if row[idxs["file_type"]] == "archive":
archives[row[idxs["id"]]] = (
row[idxs["file_url"]],
row[idxs["entry_id"]],
)
if row[idxs["file_format"]] != "cif":
continue
assoc_files.append((row[idxs["file_url"]], row[idxs["entry_id"]]))
data = _read_mmcif(os.path.join(assoc_dir, row[idxs["file_url"]]))
assoc_files.append((data, row[idxs["entry_id"]]))
# make sure entry_id is matching in associated file!
maafd = pdbx_cntnr.getObj("ma_associated_archive_file_details")
if maafd is None:
continue
idxs["file_format"] = maafd.getAttributeIndex("file_format")
if idxs["file_format"] == -1:
dat_cat = pdbx_cntnr.getObj("ma_associated_archive_file_details")
if dat_cat is None:
continue
for row in maafd:
idxs = _get_indeces(
dat_cat, ["archive_file_id", "file_format", "file_path"]
)
for row in dat_cat:
if row[idxs["file_format"]] == "cif":
raise NotImplementedError(
"Fetching associated cif files from archive."
)
arc_id = row[idxs["archive_file_id"]]
arc_file = archives[arc_id][0]
cif_file = row[idxs["file_path"]]
data = _unzip_arc_cif(arc_file, cif_file, assoc_dir)
assoc_files.append((data, archives[arc_id][1]))
return assoc_files, mdl_cif, entry_id_map
......@@ -318,12 +350,9 @@ def _try_os_remove(path):
pass
def _merge_cif_data(
model_cif_data, associated_path, row_entry_id, entry_id_map
):
def _merge_cif_data(model_cif_data, assoc_cif, row_entry_id, entry_id_map):
"""Merge contents of an associated file into cif data."""
error_msgs = {"cifcheck-errors": []}
assoc_cif = _read_mmcif(associated_path)
# per datablock, check to which datablock it belongs in the parent cif
for assoc_cntnr in assoc_cif:
......@@ -627,18 +656,16 @@ def _main():
# check for associated files referenced by the model cif file
assoc_files, model_cif_data, entry_id_map = _get_associated_files(
opts.model_cif
opts.model_cif,
opts.associates_dir,
)
# save original data for later
if opts.extend_validated_file is not None:
o_model_cif_data = copy.deepcopy(model_cif_data)
# make sure associated files exist and merge all of them into the model
for assoc, entry_id in assoc_files:
assoc_path = os.path.join(opts.associates_dir, assoc)
# merge the model.cif and the associated file
msgs = _merge_cif_data(
model_cif_data, assoc_path, entry_id, entry_id_map
)
msgs = _merge_cif_data(model_cif_data, assoc, entry_id, entry_id_map)
cifcheck.add_to_results(msgs)
validate_file = opts.model_cif
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment