Skip to content
Snippets Groups Projects
Commit fb3fecfc authored by Bienchen's avatar Bienchen
Browse files

Store non-selected models in archive upon command line option

parent 00d3f741
Branches
Tags
No related merge requests found
Biopython
CIF
DBs
FastA
Jupyter
MSA
......@@ -15,4 +16,4 @@ polypeptide
pre
repo
reproducibility
subdirectory
subdirectory
\ No newline at end of file
......@@ -13,6 +13,7 @@ import pickle
import re
import shutil
import sys
import tempfile
import zipfile
from Bio import SeqIO
......@@ -45,7 +46,7 @@ from alphapulldown.utils import make_dir_monomer_dictionary
# ToDo: Example 1 from the GitHub repo mentions MMseqs2
# ToDo: Discuss input of protocol steps, feature creation has baits, sequences
# does modelling depend on mode?
# ToDo: check that PAE files are written to an associated file
# ToDo: Option to add remaining models w PAE files to archive
# ToDo: deal with `--max_template_date`, beta-barrel project has it as software
# parameter
flags.DEFINE_string(
......@@ -59,16 +60,24 @@ flags.DEFINE_list(
flags.DEFINE_integer(
"model_selected",
None,
"model to be converted into ModelCIF, use '--select_all' to convert all "
+ "models found in '--af2_output'",
"model to be converted into ModelCIF, omit to convert all models found in "
+ "'--af2_output'",
)
flags.DEFINE_bool("compress", False, "compress the ModelCIF file using Gzip")
flags.DEFINE_bool(
"add_associated",
False,
"Add models not marked by "
+ "'--model_selected' to the archive for associated files",
)
flags.DEFINE_bool("compress", False, "compress the ModelCIF file(s) using Gzip")
flags.mark_flags_as_required(["ap_output", "monomer_objects_dir"])
FLAGS = flags.FLAGS
# ToDo: implement a flags.register_validator() checking that files/ directories
# exist as expected.
# ToDo: implement a flags.register_validator() to make sure that
# --add_associated is only activated if --model_selected is used, too
# pylint: disable=too-few-public-methods
......@@ -145,7 +154,7 @@ class _Biopython2ModelCIF(modelcif.model.AbInitioModel):
occupancy=atm.occupancy,
)
def add_scores(self, scores_json, entry_id, file_prefix, sw_dct):
def add_scores(self, scores_json, entry_id, file_prefix, sw_dct, add_files):
"""Add QA metrics"""
_GlobalPLDDT.software = sw_dct["AlphaFold"]
_GlobalPTM.software = sw_dct["AlphaFold"]
......@@ -221,6 +230,9 @@ class _Biopython2ModelCIF(modelcif.model.AbInitioModel):
)
]
if add_files:
arc_files.extend([x[1] for x in add_files.values()])
return modelcif.associated.Repository(
"",
[
......@@ -348,7 +360,7 @@ def _get_modelcif_protocol(
def _cast_release_date(release_date):
"""Type cast a date into datetime.date"""
"""Type cast a date into `datetime.date`"""
# "AF2" has a special meaning, those DBs did not change since the first
# release of AF2. This information is needed in the model-producing
# pipeline.
......@@ -414,7 +426,8 @@ def _store_as_modelcif(
mdl_file: str,
out_dir: str,
compress: bool = False,
# file_prfx, add_files
add_files: list = None,
# file_prfx
) -> None:
"""Create the actual ModelCIF file."""
system = modelcif.System(
......@@ -451,7 +464,7 @@ def _store_as_modelcif(
# process scores
mdl_file = os.path.splitext(os.path.basename(mdl_file))[0]
system.repositories.append(
model.add_scores(data_json, system.id, mdl_file, sw_dct)
model.add_scores(data_json, system.id, mdl_file, sw_dct, add_files)
)
system.model_groups.append(modelcif.model.ModelGroup([model]))
......@@ -493,33 +506,78 @@ def _store_as_modelcif(
# -> hence we cheat by changing path and back while being exception-safe...
oldpwd = os.getcwd()
os.chdir(out_dir)
created_files = {}
try:
mdl_file = f"{mdl_file}.cif"
with open(
f"{mdl_file}.cif",
mdl_file,
"w",
encoding="ascii",
) as mmcif_fh:
modelcif.dumper.write(mmcif_fh, [system])
if compress:
_compress_cif_file(f"{mdl_file}.cif")
mdl_file = _compress_cif_file(mdl_file)
created_files[mdl_file] = (
os.path.join(out_dir, mdl_file),
_get_assoc_mdl_file(mdl_file, data_json),
)
# Create associated archive
for archive in system.repositories[0].files:
with zipfile.ZipFile(
archive.path, "w", zipfile.ZIP_BZIP2
) as cif_zip:
for zfile in archive.files:
cif_zip.write(zfile.path, arcname=zfile.path)
os.remove(zfile.path)
try:
# Regardless off error, fall back to `zfile.path`, the
# other path is only needed as a special case.
# pylint: disable=bare-except
sys_path = add_files[zfile.path][0]
except:
sys_path = zfile.path
cif_zip.write(sys_path, arcname=zfile.path)
os.remove(sys_path)
created_files[archive.path] = (
os.path.join(out_dir, archive.path),
_get_assoc_zip_file(archive.path, data_json),
)
finally:
os.chdir(oldpwd)
return created_files
def _get_assoc_mdl_file(fle_path, data_json):
"""Generate a `modelcif.associated.File` object that looks like a CIF
file."""
cfile = modelcif.associated.File(
fle_path,
details=data_json["_ma_model_list.model_name"],
)
cfile.file_format = "cif"
return cfile
def _get_assoc_zip_file(fle_path, data_json):
"""Create a `modelcif.associated.File` object that looks like a ZIP file.
This is NOT the archive ZIP file for the PAEs but to store that in the
ZIP archive of the selected model."""
zfile = modelcif.associated.File(
fle_path,
details="archive with multiple files for "
+ data_json["_ma_model_list.model_name"],
)
zfile.file_format = "other"
return zfile
def _compress_cif_file(cif_file):
"""Compress CIF file and delete original."""
cif_gz_file = cif_file + ".gz"
with open(cif_file, "rb") as f_in:
with gzip.open(cif_file + ".gz", "wb") as f_out:
with gzip.open(cif_gz_file, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(cif_file)
return cif_gz_file
def _get_model_details(cmplx_name: str, data_json: dict) -> str:
......@@ -855,6 +913,7 @@ def alphapulldown_model_to_modelcif(
prj_dir: str,
monomer_objects_dir: list,
compress: bool = False,
additional_assoc_files: list = None,
) -> None:
"""Convert an AlphaPulldown model into a ModelCIF formatted mmCIF file.
......@@ -881,12 +940,36 @@ def alphapulldown_model_to_modelcif(
_get_scores(modelcif_json, mdl[1])
modelcif_json["ma_protocol_step"] = _get_protocol_steps(modelcif_json)
_store_as_modelcif(modelcif_json, structure, mdl[0], out_dir, compress)
cfs = _store_as_modelcif(
modelcif_json,
structure,
mdl[0],
out_dir,
compress,
additional_assoc_files,
)
# ToDo: ENABLE logging.info(f"... done with '{mdl[0]}'")
return cfs
def _get_model_list(ap_dir: str, model_selected: str) -> Tuple[str, str, list]:
def _add_mdl_to_list(mdl, model_list, mdl_path, score_files):
"""Fetch info from file name to add to list"""
rank = re.match(r"ranked_(\d+)\.pdb", mdl)
if rank is not None:
rank = int(rank.group(1))
model_list.append(
(
os.path.join(mdl_path, mdl),
score_files[rank][0],
score_files[rank][1], # model ID
score_files[rank][2], # model rank
)
)
def _get_model_list(
ap_dir: str, model_selected: str, get_non_selected: bool
) -> Tuple[str, str, list, list]:
"""Get the list of models to be converted.
If `model_selected` is none, all models will be marked for conversion."""
......@@ -924,28 +1007,25 @@ def _get_model_list(ap_dir: str, model_selected: str) -> Tuple[str, str, list]:
i,
)
# match PDB files with pickle files
not_selected_models = []
if model_selected is not None:
models.append(
(
os.path.join(mdl_path, f"ranked_{model_selected}.pdb"),
score_files[model_selected][0],
score_files[model_selected][1], # model ID
score_files[model_selected][2], # model rank
)
if model_selected not in score_files:
logging.info(f"Model of rank {model_selected} not found.")
sys.exit()
_add_mdl_to_list(
f"ranked_{model_selected}.pdb", models, mdl_path, score_files
)
if get_non_selected:
for mdl in os.listdir(mdl_path):
if mdl == f"ranked_{model_selected}.pdb":
continue
_add_mdl_to_list(
mdl, not_selected_models, mdl_path, score_files
)
else:
for mdl in os.listdir(mdl_path):
rank = re.match(r"ranked_(\d+)\.pdb", mdl)
if rank is not None:
rank = int(rank.group(1))
models.append(
(
os.path.join(mdl_path, mdl),
score_files[rank][0],
score_files[rank][1], # model ID
score_files[rank][2], # model rank
)
)
_add_mdl_to_list(mdl, models, mdl_path, score_files)
# check that files actually exist
for mdl, scrs, *_ in models:
......@@ -960,7 +1040,7 @@ def _get_model_list(ap_dir: str, model_selected: str) -> Tuple[str, str, list]:
)
sys.exit()
return cmplx, mdl_path, models
return cmplx, mdl_path, models, not_selected_models
def main(argv):
......@@ -986,9 +1066,26 @@ def main(argv):
del argv # Unused.
# get list of selected models and assemble ModelCIF files + associated data
complex_name, model_dir, model_list = _get_model_list(
FLAGS.ap_output, FLAGS.model_selected
complex_name, model_dir, model_list, not_selected = _get_model_list(
FLAGS.ap_output,
FLAGS.model_selected,
FLAGS.add_associated,
)
add_assoc_files = {}
if len(not_selected) > 0:
# pylint: disable=consider-using-with
ns_tmpdir = tempfile.TemporaryDirectory(suffix="_modelcif")
for mdl in not_selected:
add_assoc_files.update(
alphapulldown_model_to_modelcif(
complex_name,
mdl,
ns_tmpdir.name,
FLAGS.ap_output,
FLAGS.monomer_objects_dir,
FLAGS.compress,
)
)
for mdl in model_list:
alphapulldown_model_to_modelcif(
complex_name,
......@@ -997,6 +1094,7 @@ def main(argv):
FLAGS.ap_output,
FLAGS.monomer_objects_dir,
FLAGS.compress,
add_assoc_files,
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment