Skip to content
Snippets Groups Projects
Commit 112d00ed authored by B13nch3n's avatar B13nch3n
Browse files

Spell check, PEP8

parent 72855eff
No related branches found
No related tags found
No related merge requests found
......@@ -14,9 +14,6 @@ extension-pkg-allow-list='rapidjson'
max-line-length=80
[tool.pylint.deprecated_builtins]
# We want to use proper logging, so we can control *ALL* output bei the Abseil
# logger, hence: deprecate 'print'
bad-functions = ["map", "filter", "print"]
# Run the spell check every once in a while, having it enabled always, is too
# annoying.
......
CIF
Dockerfile
MSA
UTF
gzipped
stdout
uncategorised
usr
whitespaces
......@@ -22,7 +22,7 @@ DCKR_IMG_RPO = ( # Docker image "name"
"registry.scicore.unibas.ch/schwede/modelcif-converters/"
+ "mmcif-dict-suite"
)
# collection of docker commads used
# collection of docker commands used
DCKR_CMDS = {
"build": [DCKR, "build"],
"images": [DCKR, "images", "--format", "json"],
......@@ -116,8 +116,8 @@ def _check_docker_installed():
def _get_modelcif_dic_version():
"""Get the latest versionstring of the ModelCIF dictionary from the
official GitHub repo."""
"""Get the latest version string of the ModelCIF dictionary from the
official GitHub repository."""
rspns = requests.get(
"https://api.github.com/repos/ihmwg/ModelCIF/contents/archive",
headers={"accept": "application/vnd.github+json"},
......@@ -352,7 +352,8 @@ def _main():
if not opts.local:
# Make sure Docker is installed and necessary commands are available.
_do_step(_check_docker_installed, "checking Docker installation")
# Get expected image tag (latest ModelCIF dic version from GitHub)
# Get expected image tag (latest ModelCIF dictionary version from
# GitHub).
dic_version = _do_step(
_get_modelcif_dic_version,
"fetching latest ModelCIF dictionary version",
......@@ -405,6 +406,3 @@ def _main():
if __name__ == "__main__":
_main()
# LocalWords: pylint argparse ArgumentParser subprocess sys DCKR args exc
# LocalWords: stdout stderr FileNotFoundError CalledProcessError returncode
......@@ -11,9 +11,9 @@ and thus, won't be merged into the model mmCIF file and won't be checked.
# pylint: enable=invalid-name
# ToDo: enable testing of gzipped files
# ToDo: add "modelcif-pedantic" mode, fail on categories that are technically
# ToDo: add `modelcif-pedantic` mode, fail on categories that are technically
# allowed but discouraged to be used, like _exptl
# ToDo: Remove pip installs which are in requirements.txt from Dockerfile
# ToDo: Remove pip installs which are in `requirements.txt` from Dockerfile
from io import TextIOWrapper
import argparse
......@@ -41,7 +41,6 @@ import mmcif.io.PdbxExceptions
def _parse_command_line():
"""Get arguments."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"model_cif",
type=str,
......@@ -102,9 +101,7 @@ def _parse_command_line():
help="Write some messages to stdout instead of just having it as JSON. "
+ "Useful for debugging on the command line.",
)
opts = parser.parse_args()
# post process arguments
if opts.extend_validated_file is not None:
if opts.extend_validated_file == " same ":
......@@ -158,7 +155,7 @@ def _parse_parser_file(filename):
def _parse_diag_file(filename):
"""Parse the diagnosis file of CifCheck."""
# CifCheck places the diag file in the cwd.
# CifCheck places the diag file in the current working directory.
diagfile = filename + "-diag.log"
if not os.path.exists(diagfile):
return []
......@@ -285,6 +282,50 @@ def _unzip_arc_cif(arc_zip, cif_file):
return assoc_data
def _get_assoc_data_from_zip_arc(
dat_cat, archives, assoc_dir, assoc_files, cifcheck
):
"""Extract data to be appended to the main CIF file from associated
archives."""
idxs = _get_indeces(
dat_cat,
["archive_file_id", "file_content", "file_format", "file_path"],
)
last_arc_id = ""
arc_zip = None
for row in dat_cat:
# Get a ZipFile object of the archive to read CIF files and check
# the presence of non-CIF files.
arc_id = row[idxs["archive_file_id"]]
arc_file = archives[arc_id][0]
if arc_id != last_arc_id:
last_arc_id = arc_id
if arc_zip is not None:
arc_zip.close()
arc_zip, arc_namelist = _get_arc_zipfile_handle(
arc_file, assoc_dir
)
if row[idxs["file_format"]] == "cif":
if row[idxs["file_content"]] == "local pairwise QA scores":
cif_file = row[idxs["file_path"]]
data = _unzip_arc_cif(arc_zip, cif_file)
assoc_files.append((data, archives[arc_id][1]))
elif row[idxs["file_content"]] != "other":
raise RuntimeError(
"Unknown associated CIF file content "
+ f"found: {row[idxs['file_content']]}"
)
else:
if row[idxs["file_path"]] not in arc_namelist:
cifcheck.add_general_error(
f"ma_entry_associated_files.file_url '{arc_file}' is "
+ "missing "
+ "ma_associated_archive_file_details.file_path "
+ f"'{row[idxs['file_path']]}'"
)
arc_zip.close()
def _get_associated_files(model_cif_file, assoc_dir, cifcheck):
"""Get the list of associated files from a model CIF file."""
# This is an intermediate step, so we do not need to check/ report anything
......@@ -328,52 +369,14 @@ def _get_associated_files(model_cif_file, assoc_dir, cifcheck):
dat_cat = pdbx_cntnr.getObj("ma_associated_archive_file_details")
if dat_cat is None:
continue
idxs = _get_indeces(
dat_cat,
["archive_file_id", "file_content", "file_format", "file_path"],
)
# get associated files/ data that can be added to the CIF content
last_arc_id = ""
arc_zip = None
for row in dat_cat:
# Get a ZipFile object of the archive to read CIF files and check
# the presence of non-CIF files.
arc_id = row[idxs["archive_file_id"]]
arc_file = archives[arc_id][0]
if arc_id != last_arc_id:
last_arc_id = arc_id
if arc_zip is not None:
arc_zip.close()
arc_zip, arc_namelist = _get_arc_zipfile_handle(
arc_file, assoc_dir
)
if row[idxs["file_format"]] == "cif":
if row[idxs["file_content"]] == "local pairwise QA scores":
cif_file = row[idxs["file_path"]]
data = _unzip_arc_cif(arc_zip, cif_file)
assoc_files.append((data, archives[arc_id][1]))
elif row[idxs["file_content"]] != "other":
raise RuntimeError(
"Unknown associated CIF file content "
+ f"found: {row[idxs['file_content']]}"
)
else:
if row[idxs["file_path"]] not in arc_namelist:
cifcheck.add_general_error(
f"ma_entry_associated_files.file_url '{arc_file}' is "
+ "missing "
+ "ma_associated_archive_file_details.file_path "
+ f"'{row[idxs['file_path']]}'"
)
arc_zip.close()
_get_assoc_data_from_zip_arc(
dat_cat, archives, assoc_dir, assoc_files, cifcheck
)
return assoc_files, mdl_cif, entry_id_map
# ToDo: def _get_assoc_data():
# """Extract data to be appended to the main CIF file from associated archives."""
def _cmp_cif_rows(a_row, b_row, a_idxs, b_idxs, attrs):
"""Compare two CIF rows by given attributes"""
for i in attrs:
......@@ -385,7 +388,7 @@ def _cmp_cif_rows(a_row, b_row, a_idxs, b_idxs, attrs):
def _add_row(row, src_idxs, dest_idxs, dest, attrs_l):
"""Add a data row to an existing data block with the right item order."""
# create a new row fitting dest's order
# create a new row fitting `dest`'s order
new_row = list("?" * attrs_l)
for i, j in src_idxs.items():
new_row[dest_idxs[i]] = row[j]
......@@ -394,7 +397,7 @@ def _add_row(row, src_idxs, dest_idxs, dest, attrs_l):
def _add_or_extend_rows(src, dest, common, not_in_dest):
"""Mix/ add rows from `src` into `dest`."""
# extend dest with new attributes
# extend `dest` with new attributes
for attr in not_in_dest:
dest.appendAttribute(attr)
s_idx = src.getAttributeIndexDict()
......@@ -414,7 +417,7 @@ def _add_or_extend_rows(src, dest, common, not_in_dest):
break
if not match:
_add_row(src_row, s_idx, d_idx, dest, attrs_l)
# extend dest rows that never matched with "?" as default value
# extend `dest` rows that never matched with "?" as default value
for i in d_rows:
dest_row = dest[i]
for attr in not_in_dest:
......@@ -436,7 +439,7 @@ def _merge_cif_datacontainer(
not_in_p, in_both, _ = db_ctgry.cmpAttributeNames(p_ctgry)
_add_or_extend_rows(db_ctgry, p_ctgry, in_both, not_in_p)
else:
# data category does not exist in parent, append it to datablock
# data category does not exist in parent, append it to data block
parent_datablock.append(db_ctgry)
......@@ -452,14 +455,14 @@ def _merge_cif_data(model_cif_data, assoc_cif, row_entry_id, entry_id_map):
"""Merge contents of an associated file into CIF data."""
error_msgs = {"cifcheck-errors": []}
# per datablock, check to which datablock it belongs in the parent cif
# per data block, check to which data block it belongs in the parent CIF
for assoc_cntnr in assoc_cif:
# check/ get 'entry_link'
assoc_entry_link = assoc_cntnr.getObj("entry_link")
if assoc_entry_link is None:
error_msgs["cifcheck-errors"].append(
'ERROR - category "entry_link" is mandatory, but it is not '
+ f'present in datablock "{assoc_cntnr.getName()}"'
+ f'present in data block "{assoc_cntnr.getName()}"'
)
continue
# make sure entry_id exists for entry_link
......@@ -470,8 +473,8 @@ def _merge_cif_data(model_cif_data, assoc_cif, row_entry_id, entry_id_map):
+ '"entry_id" is not in category "entry_link"'
)
continue
# For each entry_id, look up the corresponding datablock in
# model_cif_data and merge with that datablock.
# For each entry_id, look up the corresponding data block in
# model_cif_data and merge with that data block.
for row in assoc_entry_link:
entry_id = row[entry_id_idx]
if entry_id != row_entry_id:
......@@ -564,10 +567,10 @@ class _CifCheck:
def _execute(self, filepath):
"""Execute the CifCheck tool on a model mmCIF file."""
# If permission errors occur with the source directory of the CIF file,
# consider copying the file to a Python tempfile generated path. That
# deals with missing $TMP, $TEMP, etc.... variables.
# At the moment, cwd is switched to the source directory since CifCheck
# copies the file, otherwise.
# consider copying the file to a Python `tempfile` generated path. That
# deals with missing `$TMP`, `$TEMP`, etc.... variables.
# At the moment, current working directory is switched to the source
# directory since CifCheck copies the file, otherwise.
cifcheck_filepath = os.path.basename(filepath)
cifcheck_cmd = [
"CifCheck",
......@@ -671,7 +674,7 @@ class _CifCheck:
for x in self.version["versions"]
],
)
# We want nicely formatted cif files, so place audit_conform
# We want nicely formatted CIF files, so place audit_conform
# after entry.
objs = pdbx_cntnr.getObjCatalog()
names = list(objs.keys())
......@@ -802,7 +805,8 @@ class _CifCheck:
line,
)
if match is not None:
# prepare a string to be removed from Mismatching parent/ child relationships
# prepare a string to be removed from Mismatching parent/ child
# relationships
chld = match.group("chld").split(".")[0][1:]
prnt = match.group("prnt").split(".")[0][1:]
try:
......@@ -872,7 +876,7 @@ class _CifCheck:
self._condense_diagnosis_data()
self._condense_other_errors()
# print erros/ messages caught
# print errors/ messages caught
if len(self.check_results["cifcheck-errors"]) > 0:
print("Errors by running CifCheck:")
for line in self.check_results["cifcheck-errors"]:
......@@ -933,7 +937,7 @@ def _main():
cifcheck.make_json_output()
sys.exit(1)
# check for associated files referenced by the model cif file
# check for associated files referenced by the model CIF file
assoc_files, model_cif_data, entry_id_map = _get_associated_files(
opts.model_cif,
opts.associates_dir,
......@@ -944,15 +948,15 @@ def _main():
o_model_cif_data = copy.deepcopy(model_cif_data)
# make sure associated files exist and merge all of them into the model
for assoc, entry_id in assoc_files:
# merge the model.cif and the associated file
# merge the model CIF and the associated file
msgs = _merge_cif_data(model_cif_data, assoc, entry_id, entry_id_map)
cifcheck.add_to_results(msgs)
validate_file = opts.model_cif
if assoc_files:
# write merged data to disk, create tmp file, clean up when done
# write merged data to disk, create temporary file, clean up when done
cfh, cfn = tempfile.mkstemp(suffix=".cif", text=True)
# register for deletion here and in cwd
# register for deletion here and in current working directory
atexit.register(_try_os_remove, cfn)
os.close(cfh)
_write_mmcif(cfn, model_cif_data)
......@@ -992,5 +996,3 @@ def _main():
if __name__ == "__main__":
_main()
# LocalWords: cif MSA FASTA pylint stdout CifCheck param src str dest cwd
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment