Skip to content
Snippets Groups Projects
Select Git revision
  • 3fd33166449e4366a5cabd1a2adc2003009dfb8a
  • main default protected
  • feature-modelcif
  • 1.3.0
  • 1.2.0
  • 1.1.0
6 results

create_pipeline_in.py

Blame
  • create_pipeline_in.py 32.87 KiB
    import json
    import argparse
    from argparse import RawTextHelpFormatter
    import os
    import pandas as pd
    
    import ost
    
    from var3d.base import *
    from var3d import uniprot
    from var3d import pipeline
    from var3d import struct_importer
    from var3d import struct_anno
    
    
    class Logger(ost.LogSink):
        def __init__(self):
            ost.LogSink.__init__(self)
    
        def LogMessage(self, msg, severity):
            if severity == ost.LogLevel.Error:
                print("[ERROR] " + msg.replace(os.linesep, os.linesep + "        "))
            elif severity == ost.LogLevel.Warning:
                print(
                    "[WARNING] "
                    + msg.replace(os.linesep, os.linesep + "          ")
                )
            elif severity == ost.LogLevel.Info:
                print("[INFO] " + msg.replace(os.linesep, os.linesep + "       "))
    
    
    def LoadSequenceList(sequence_file):
        """Loads sequences in fasta format
    
        There would be ost.io.LoadSequenceList but it throws if there are empty
        sequences. Guess what, one of the default files has empty sequences 
        (RNA stuff). Additionally, some of the sequences have invalid characters
        ('+', whatever this stands for). We just skip them when reading.
        There will be an error raised if a required sequence is missing...
    
        :param sequence_file: Path to file with sequences in fasta format
        :type sequence_file: :class:`str`
        """
        slist = ost.seq.CreateSequenceList()
        curr_name = ""
        curr_seq = ""
        with open(sequence_file, "r") as fh:
            data = fh.readlines()
        for line in data:
            if line.startswith(">"):
                if len(curr_name) > 0 and len(curr_seq) > 0:
                    try:
                        s = ost.seq.CreateSequence(curr_name, curr_seq)
                    except:
                        pass  # yolo
                    slist.AddSequence(s)
                curr_name = line.strip().lstrip(">")
                curr_seq = ""
            else:
                curr_seq += line.strip()
        if len(curr_name) > 0 and len(curr_seq) > 0:
            try:
                s = ost.seq.CreateSequence(curr_name, curr_seq)
            except:
                pass  # yolo
            slist.AddSequence(s)
        return slist
    
    
    class SequenceContainer:
        def __init__(self, sequence_file):
            """Simple container for sequences
    
            Works for the two default reference proteom files and allows
            sequences to be retrieved by Uniprot AC or gene locus.
            The sequences in the loaded sequence list are processed such that
            their sequence name is set to uniprot AC or gene locus (i.e. 
            all additional annoation in the sequence name is gone).
    
            :param sequence_file: Path to file with sequences in fasta format
            :type sequence_file: :class:`str`
            """
            slist = LoadSequenceList(sequence_file)
            self.sequences = dict()
            for s in slist:
                sname = s.GetName()
                split_sname = sname.split("|")
                if len(split_sname) < 2:
                    raise RuntimeError("Could not load")
                if uniprot.ValidACPattern(split_sname[1]):
                    # it's from the uniprot reference proteome
                    sname = split_sname[1]
                else:
                    sname = split_sname[0]
                self.sequences[sname] = ost.seq.CreateSequence(sname, str(s))
    
        def Get(self, sname):
            """Getter sequence by name
    
            :param sname: Name of sequence you're looking for
            :type sname: :class:`str`
            """
            if sname in self.sequences:
                return self.sequences[sname]
            raise RuntimeError(f"Could not find sequence with name {sname}")
    
    
    class WHOVarImporter(VarImporter):
        def __init__(self, myctu_catalogue, drug_mapping):
            """
            :param myctu_catalogue: path to xlsx file found on
                                    https://www.who.int/publications/i/item/9789240028173
            :type myctu_catalogue: :class:`str`
            :param drug_mapping: Path to json file containing mappings
            :type drug_mapping: :class:`dict`
            """
            xl_file = pd.ExcelFile(myctu_catalogue)
            self._genome_indices = xl_file.parse("Genome_indices")
    
            self._variants = dict()  # key: gene identifier (names of sequences that
            # enter the Run function, expected to match
            # final_annotation.LocusTag column in
            # Genome_indices sheet of myctu_catalogue),
            # value: list of variants.
            # Will be filled the lazy way...
            self._drug_mapping = drug_mapping
    
        def Run(self, sequence, seq_range=None):
            sname = sequence.GetName()
            if sname not in self._variants:
                self._Process(sname, sequence)
            return self._variants[sname]
    
        def Key(self):
            return "who_var_importer"
    
        def _Process(self, sname, sequence):
            # filter by myctu id
            df = self._genome_indices[
                self._genome_indices["final_annotation.LocusTag"] == sname
            ]
            initial_len = len(df)
            ost.LogInfo("Number of variants in WHO table: " + str(initial_len))
            # filter by variants which lie on proteins
            df = df[~df["final_annotation.TentativeHGVSProteicAnnotation"].isnull()]
            ost.LogInfo("Number of genetic variants: " + str(initial_len - len(df)))
            # if there are no protein variants for this name, just set empty list
            if len(df) == 0:
                self._variants[sname] = list()
                return
            # collect variants
            parsed_variants = dict()
            for i, row in df.iterrows():
                try:
                    v = self._VarFromRow(row, sequence)
                    h = v.GetHash()
                    if h in parsed_variants:
                        ost.LogWarning(
                            f"Variant with equal hash already there. "
                            f"parsed variant: {json.dumps(v.ToJSON())}."
                            f"previously parsed variant: "
                            f"{json.dumps(parsed_variants[h].ToJSON())}."
                            f" => skip"
                        )
                    else:
                        parsed_variants[h] = v
                except Exception as e:
                    ost.LogError("Failed to import variant: " + str(e))
    
            self._variants[sname] = list(parsed_variants.values())
            ost.LogInfo(
                "Variants on UniProt sequence: "
                + str(len(parsed_variants.values()))
            )
    
        def _VarFromRow(self, row, seqres):
    
            hgvs_str = row["final_annotation.TentativeHGVSProteicAnnotation"]
            variant = Variant.FromHGVS(hgvs_str, seqres)
            try:
                variant.MatchesSEQRES(seqres)
            except Exception as e:
                raise RuntimeError(
                    f"Successfuly parsed HGVS variant ({hgvs_str})"
                    f" but it doesn't match the provided SEQRES "
                    f"({str(seqres)})"
                )
    
            # get the drugs and the grading of resistance
            pubchem_ids = list()
            who_gradings = list()
    
            conf_grades = [
                k for k, v in row.items() if v == v and k.endswith("_Conf_Grade")
            ]
            for cf in conf_grades:
                drug = cf[:3]  # expect three letter codes
                if drug not in self._drug_mapping:
                    raise RuntimeError(
                        f"WHO mutation catalogue specifies drug "
                        f"{drug} for which no PubChem mapping is "
                        f"defined."
                    )
                pubchem_ids.append(str(int(self._drug_mapping[drug])))
                who_gradings.append(row[cf])
            variant.AddMetaData("pubchemid", pubchem_ids)
            variant.AddMetaData("WHO_grading", who_gradings)
    
            # also parse the who labels to attach one final label to variant
            gradings = []
            for label in who_gradings:
                # hardcode all possible gradings we observed in the original data
                # raise an error in case of unkown grading
                if label == "1) Assoc w R":
                    gradings.append("resistant")
                elif label == "2) Assoc w R - Interim":
                    gradings.append("resistant")
                elif label == "3) Uncertain significance":
                    gradings.append("uncertain")
                elif label == "4) Not assoc w R - Interim":
                    gradings.append("neutral")
                elif label == "5) Not assoc w R":
                    gradings.append("neutral")
                elif label == "Synonymous":
                    gradings.append("synonymous")
                else:
                    raise RuntimeError(
                        f"Observed unknown drug resistance grading "
                        f"({label}) when parsing {hgvs_str} from "
                        f"sequence with name {seqres.GetName()}"
                    )
            if "resistant" in gradings:
                final_label = "WHO_RESISTANT"
            elif "uncertain" in gradings:
                final_label = "WHO_UNCERTAIN"
            elif "neutral" in gradings:
                final_label = "WHO_NEUTRAL"
            elif "synonymous" in gradings:
                final_label = "WHO_SYNONYMOUS"
            else:
                final_label = "WHO_UNCERTAIN"
            variant.AddMetaData("variant_label", final_label)
    
            return variant
    
    
    class WHOStructImporter(struct_importer.FilesystemStructImporter):
        def __init__(self, uniprot_sequences, structure_dir, structure_data):
            """Importer for custom set of structures
    
            :param uniprot_sequences: Container of Uniprot reference sequences
            :type uniprot_sequences: :class:`SequenceContainer`
            :param structure_dir: Root directory storing the structures in a 
                                  directory structure as described in the main
                                  function help
            :type structure_dir: :class:`str`
            :param structure_data: Custom csv file with annotations as descibed
                                   in the main function help
            :type structure_data: :class:`str`
            """
    
            struct_importer.FilesystemStructImporter.__init__(self)
    
            data = pd.read_csv(structure_data)
    
            for uniprot_ac in os.listdir(structure_dir):
                if not uniprot.ValidACPattern(uniprot_ac):
                    raise RuntimeError(
                        f"Expect all directory names in "
                        f"structure_dir ({structure_dir}) to "
                        f"have valid Uniprot AC patterns. Got "
                        f"{uniprot_ac}"
                    )
                uniprot_ac_path = os.path.join(structure_dir, uniprot_ac)
                for uniprot_ac_structure_dir in os.listdir(uniprot_ac_path):
                    structure_file = os.path.join(
                        uniprot_ac_path, uniprot_ac_structure_dir, "coords.pdb"
                    )
    
                    # check if structure exists
                    if not os.path.isfile(structure_file):
                        raise RuntimeError(f"{structure_file} does not exist!")
    
                    # get PubChemID from path, path is like
                    # ./data/TBvar3D/structure/[PubChemID]/
                    pcid = uniprot_ac_structure_dir.split(os.sep)[-1]
    
                    # filter drug target pair table to get unique entry for current
                    # Uniprot AC and PubChemID
                    df = data[
                        (data["prot_uniprotid"] == uniprot_ac)
                        & (data["drug_pubchemid"] == int(pcid))
                    ]
                    if len(df) != 1:
                        raise RuntimeError(
                            f"Expect exactly one line in "
                            f"structure_data csv per Uniprot AC / "
                            f"PubChem ID ({uniprot_ac}/{pcid}) pair "
                            f"got {len(df)}."
                        )
    
                    self._Process(
                        uniprot_ac,
                        uniprot_sequences.Get(uniprot_ac),
                        pcid,
                        structure_file,
                        df.iloc[0],
                    )
    
        def _Process(self, uniprot_ac, sequence, pcid, structure_file, row):
    
            # create structure identifier UniProtAC_PubChemID
            structure_identifier = uniprot_ac + "_" + pcid
    
            meta = {}
    
            # get current chains
            cur_chain = row["pdbid_chain"]
    
            # get current method of structure
            if row["method_structure"] == row["method_structure"]:
                meta["method_structure"] = str(row["method_structure"])
            else:
                meta["method_structure"] = None
    
            # get current oligomeric state
            if row["oligomeric_state"] == row["oligomeric_state"]:
                meta["oligomeric_state"] = str(row["oligomeric_state"])
            else:
                meta["oligomeric_state"] = None
    
            # get experimental pdbid
            if row["pdbid_exp"] == row["pdbid_exp"]:
                meta["experimental_pdbid"] = str(row["pdbid_exp"])
            else:
                meta["experimental_pdbid"] = None
    
            # get template pdbid
            if row["pdbid_hom"] == row["pdbid_hom"]:
                meta["template_pdbid"] = str(row["pdbid_hom"])
            else:
                meta["template_pdbid"] = None
    
            # get biounit
            if row["pdbid_biounit"] == row["pdbid_biounit"]:
                meta["biounit"] = str(row["pdbid_biounit"])
            else:
                meta["biounit"] = None
    
            # get free text description on how structure has been generated
            if row["structure_description"] == row["structure_description"]:
                meta["structure_description"] = str(row["structure_description"])
            else:
                meta["structure_description"] = None
    
            # get current pubchemid
            if row["drug_pubchemid"] == row["drug_pubchemid"]:
                meta["pubchemid"] = [str(int(row["drug_pubchemid"]))]
            else:
                meta["pubchemid"] = []
    
            # get current method of ligand
            if row["method_ligand"] == row["method_ligand"]:
                if len(meta["pubchemid"]) == 0:
                    ost.LogError("Specified method_ligand but no pubchem id!")
                meta["method_ligand"] = [str(row["method_ligand"])]
            else:
                # ensure same length as pubchemid
                meta["method_ligand"] = [None] * len(meta["pubchemid"])
    
            # get pdbid from which ligand was transfered
            if row["pdbid_trans"] == row["pdbid_trans"]:
                if len(meta["pubchemid"]) == 0:
                    ost.LogError("Specified pdbid_trans but no pubchem id!")
                meta["ligand_source_pdbid"] = [str(row["pdbid_trans"])]
            else:
                # ensure same length as pubchemid
                meta["ligand_source_pdbid"] = [None] * len(meta["pubchemid"])
    
            # get free text description on how ligand has been added
            if row["ligand_description"] == row["ligand_description"]:
                if len(meta["pubchemid"]) == 0:
                    ost.LogError("Specified ligand_description but no pubchem id!")
                meta["ligand_description"] = [str(row["ligand_description"])]
            else:
                # ensure same length as pubchemid
                meta["ligand_description"] = [None] * len(meta["pubchemid"])
    
            # get free text description on activated form (ask Erblin)
            if (
                row["activated_form_description"]
                == row["activated_form_description"]
            ):
                if len(meta["pubchemid"]) == 0:
                    ost.LogError(
                        "Specified activated_form_description but no pubchem id!"
                    )
                meta["activated_form_description"] = [
                    str(row["activated_form_description"])
                ]
            else:
                # ensure same length as pubchemid
                meta["activated_form_description"] = [None] * len(meta["pubchemid"])
    
            # get residue number of ligand residues
            cur_pdbligandid = row["pdbligandid"]
            if cur_pdbligandid == cur_pdbligandid:
                if len(meta["pubchemid"]) == 0:
                    ost.LogError("Specified pdbligandid but no pubchem id!")
                # avoid polluting output when loading PDB file
                ost.PushVerbosityLevel(ost.LogLevel.Error)
                ent = io.LoadPDB(structure_file)
                ost.PopVerbosityLevel()
                ent_ligand = ent.Select("cname=_ and rname=" + cur_pdbligandid)
                meta["ligand_res_num"] = [
                    [str(x.number) for x in ent_ligand.residues]
                ]
            else:
                # ensure same length as pubchemid
                meta["ligand_res_num"] = [None] * len(meta["pubchemid"])
    
            # create segment query list
            segment_query = []
            for chain_id in cur_chain:
                segment_query.append("cname=" + chain_id)
    
            # add current structure settings to struct importer base class
            self.Add(
                sequence,
                structure_identifier,
                structure_file,
                segment_query,
                meta=meta,
            )
    
    
    def ImportVariants(
        uniprot_sequence, who_sequence, var_import_pipeline, mechanism
    ):
    
        # variant import happens relative to who_sequence
        ost.LogInfo(f"Import variants for gene locus: {who_sequence.GetName()}")
        data = pipeline.DataContainer(who_sequence)
        data = var_import_pipeline.Import(data)
        ost.LogInfo(f"Imported {len(data.variants)} variants")
        aln = ost.seq.alg.SemiGlobalAlign(
            uniprot_sequence, who_sequence, ost.seq.alg.BLOSUM100
        )[0]
        assert aln.GetSequence(0).GetGaplessString() == str(uniprot_sequence)
        assert aln.GetSequence(1).GetGaplessString() == str(who_sequence)
        ost.LogInfo("Map variants with the following alignment:")
        ost.LogInfo(aln.ToString(80))
    
        # setup mapping with rigorous aln consistency checks
        # shouts a LogError for any of:
        # - mismatch, except if it's the first character of one of the sequences
        # - insertion in any of the sequences (except leading gaps)
        uniprot_rnum = 0
        who_rnum = 0
        rnum_mapper = dict()
        for col in aln:
            if col[0] != "-":
                uniprot_rnum += 1
            if col[1] != "-":
                who_rnum += 1
            # check for insertions
            if col[0] == "-" and uniprot_rnum >= 1:
                ost.LogError(
                    f"Alignment has insertion in Uniprot sequence after "
                    f"rnum {uniprot_rnum}"
                )
            if col[1] == "-" and who_rnum >= 1:
                ost.LogError(
                    f"Alignment has insertion in WHO sequence after "
                    f"rnum {who_rnum}"
                )
            # check for match and fill mapper
            if col[0] != "-" and col[1] != "-":
                rnum_mapper[who_rnum] = uniprot_rnum
                if col[0] != col[1]:
                    if who_rnum == 1:
                        msg = "Uniprot/WHO sequence alignment mismatch for WHO "
                        msg += "start codon. This is considered OK."
                        ost.LogWarning(msg)
                    else:
                        msg = f"Uniprot/WHO sequence alignment mismatch "
                        msg += f"({col[0]}{uniprot_rnum} vs. {col[1]}{who_rnum})"
                        ost.LogError(msg)
    
        # map to uniprot using the rnum mapping derived above
        # shouts a LogError for
        # - pos of WHO variant is not in mapper => not covered in alignment,
        #   i.e. position is not covered by Uniprot sequence
        # - Variant mapped to Uniprot fails in MatchesSeqres(uniprot_sequence)
        # The following special case is allowed: The sequences WHO refers to
        # sometimes have a different start AA, whereas uniprot consistently has
        # M. We consider a mismatch OK if its a variant of type START_LOST since
        # the effect is what really matters.
        uniprot_data = pipeline.DataContainer(uniprot_sequence)
        for v_idx in range(data.GetNumVariants()):
            v = data.GetVariant(v_idx)
            if v.pos not in rnum_mapper:
                ost.LogError(
                    f"Cannot map variant ({json.dumps(v.ToJSON())}) to "
                    f"Uniprot sequence. Position not covered => skip"
                )
                continue
            # deal with described special case
            if v.pos == 1 and v.variant_type == VariantType.START_LOST:
                uniprot_ref = uniprot_sequence[rnum_mapper[v.pos] - 1]
                if v.ref != uniprot_ref:
                    msg = "WHO sequence start codon doesnt match with uniprot "
                    msg += "sequence. This is considered OK and the according "
                    msg += "uniprot one letter code is set as reference in the "
                    msg += "imported variant."
                    ost.LogWarning(msg)
                uniprot_v = Variant(
                    uniprot_ref, v.alt, rnum_mapper[v.pos], v.variant_type
                )
            else:
                uniprot_v = Variant(
                    v.ref, v.alt, rnum_mapper[v.pos], v.variant_type
                )
            for meta_key, meta_value in v.meta.items():
                uniprot_v.AddMetaData(meta_key, meta_value)
            # mechanism information gets added separately
            uniprot_v.AddMetaData("mechanism", mechanism)
            try:
                uniprot_v.MatchesSEQRES(uniprot_sequence)
            except Exception as e:
                ost.LogError(
                    f"WHO variant ({json.dumps(v.ToJSON())}) does not "
                    f"map on Uniprot sequence. Tried to match following "
                    f"Uniprot variant: {json.dumps(uniprot_v.ToJSON())} "
                    f"got: {str(e)} => skip"
                )
                continue
            uniprot_data.AddVariant(uniprot_v)
    
        ost.LogInfo(
            f"Mapped {len(uniprot_data.variants)} variants on Uniprot " f"sequence"
        )
        return uniprot_data
    
    
    def ImportStructures(uniprot_sequence, struct_import_pipeline):
        ost.LogInfo(
            f"Import structures for Uniprot AC: " f"{uniprot_sequence.GetName()}"
        )
        data = pipeline.DataContainer(uniprot_sequence)
        # kill logging during import and do sanity checks afterwards
        ost.PushVerbosityLevel(ost.LogLevel.Error)
        data = struct_import_pipeline.Import(data)
        ost.PopVerbosityLevel()
    
        # There is no strict requirement for a full match between structure
        # and Uniprot sequence when importing. We nevertheless check and spit
        # out warnings to allow for a manual intervention by the user
        for s in data.structures:
            uniprot_rnum = 0
            for aln in s.aln:
                for col in aln:
                    if col[0] != "-":
                        uniprot_rnum += 1
                    if col[0] != "-" and col[1] != "-":
                        if col[0] != col[1]:
                            msg = f"SEQRES/ATOMSEQ mismatch when importing "
                            msg += f"structure for Uniprot AC "
                            msg += f"{uniprot_sequence.GetName()}. "
                            msg += f"Expect {col[0]} at Uniprot pos {uniprot_rnum} "
                            msg += f"got {col[1]} for residue "
                            msg += f"{str(col.GetResidue(1))} in structure "
                            msg += f"{s.structure_identifier}. Requires manual "
                            msg += f"checking. Alignment: "
                            msg += os.linesep
                            msg += os.linesep
                            msg += aln.ToString(80)
                            ost.LogWarning(msg)
        ost.LogInfo(f"Imported {len(data.structures)} structures")
    
        # some structures might be equal but refer to different ligands in their
        # meta data. Merge if thats the case.
        structures = list()
        assembly_hashes = list()
        for s_idx in range(data.GetNumStructures()):
            already_there = False
            s = data.GetStructure(s_idx)
            s_hash = s.GetAssemblyHash()
            for added_s, added_s_hash in zip(structures, assembly_hashes):
                if s_hash == added_s_hash:
                    ost.LogInfo(f"Observed 2 structures with equal hash - Merge")
                    already_there = True
                    added_s.meta["pubchemid"] += s.meta["pubchemid"]
                    added_s.meta["method_ligand"] += s.meta["method_ligand"]
                    added_s.meta["ligand_source_pdbid"] += s.meta[
                        "ligand_source_pdbid"
                    ]
                    added_s.meta["ligand_res_num"] += s.meta["ligand_res_num"]
                    added_s.meta["ligand_description"] += s.meta[
                        "ligand_description"
                    ]
                    added_s.meta["activated_form_description"] += s.meta[
                        "activated_form_description"
                    ]
                    break
            if not already_there:
                structures.append(s)
                assembly_hashes.append(s_hash)
    
        merged_data = pipeline.DataContainer(
            data.sequence, data.seq_identifier, data.seq_range
        )
        for s in structures:
            merged_data.AddStructure(s)
    
        if data.GetNumStructures() != merged_data.GetNumStructures():
            ost.LogInfo(
                f"Remaining unique structures: {merged_data.GetNumStructures()}"
            )
    
        # The membrane of transmembrane structures should be nicely aligned to
        # the z-axis. Looks better when displayed in a 3D viewer later on
        mem_annotator = struct_anno.TransmembraneStructAnno()
        mem_anno_pipeline = pipeline.AnnotationPipeline(struct_anno=[mem_annotator])
        anno = mem_anno_pipeline.Run(merged_data)
        aligned_data = pipeline.DataContainer(
            data.sequence, data.seq_identifier, data.seq_range
        )
        for s_idx in range(merged_data.GetNumStructures()):
            s = merged_data.GetStructure(s_idx)
            s_hash = s.GetHash()
            mem_anno = anno["struct_anno"][mem_annotator.Key()][s_hash]
            if mem_anno["is_transmem"]:
                ost.LogInfo(
                    f"{s.structure_identifier} is identified as "
                    f"transmembrane structure - Align membrane axis with "
                    f"z-axis"
                )
                m_param = mem_anno["mem_param"]["axis_transform"]
                m = ost.geom.Mat4(*m_param)
                transformed_assembly = s.assembly.Copy()
                transformed_assembly.EditXCS().ApplyTransform(m)
                new_s = Structure(
                    s.sequence,
                    s.structure_identifier,
                    transformed_assembly,
                    s.segment_query,
                    aln=s.aln,
                    strict=s.strict,
                )
                for k, v in s.meta.items():
                    new_s.AddMetaData(k, v)
                aligned_data.AddStructure(new_s)
            else:
                aligned_data.AddStructure(s)
    
        return aligned_data
    
    
    def _parse_args():
    
        desc = (
            f"Required input files:{os.linesep}"
            + "- Catalogue of mutations in Mycobacterium tuberculosis complex \n"
            + "  and their association with drug resistance. The default\n"
            + "  (WHO-UCN-GTB-PCI-2021.7-eng.xlsx) has been downloaded from \n"
            + "  https://www.who.int/publications/i/item/9789240028173.\n"
            + "  Contains variant annotations that are mapped to the\n"
            + "  M. tuberculosis reference genome H37Rv\n"
            + "  (NCBI Nucleotide database accession ID: NC_000962.3)\n"
            + "- Protein sequences from H37Rv onto which the WHO catalogue maps.\n"
            + "  The default\n"
            + "  (Mycobacterium_tuberculosis_H37Rv_proteins_v4.fasta) has been\n"
            + "  downloaded from https://mycobrowser.epfl.ch/releases\n"
            + "- Reference proteome from Uniprot to which our final data should\n"
            + "  map. The default (uniprot-proteome_UP000001584.fasta) comes\n"
            + "  from Uniprot - strain: ATCC 25618 / H37Rv, last modified:\n"
            + "  May 24, 2021.\n"
            + "- Dictionary JSON format with two keys.\n"
            + '  - "uniprot_mycobrowser_mapping": another dictionary with\n'
            + "    Uniprot AC as key and Gene identifier used by the TB crowd\n"
            + "    as value. (final_annotation.LocusTag column in the\n"
            + "    Genome_indices sheet of WHO excel file described above)\n"
            + "    One JSON output file will be generate for each of those\n"
            + "    key/value pairs.\n"
            + '  - "drugtlc_pubchem_mapping": another dictionary with drug\n'
            + "    three letter codes as referred to in the Genome_indices sheet\n"
            + "    of the WHO excel file as keys. Values are the respective\n"
            + "    pubchem ids.\n"
            + "- A directory with a subdirectory for each Uniprot AC for which\n"
            + "  structures should be imported. The idea here is to always have\n"
            + "  one structure per associated drug, so each of those directories\n"
            + "  again contains a directory with the according Pubchem Id as name.\n"
            + '  in there you need a file called "coords.pdb":\n'
            + "  -<Uniprot AC 1>\n"
            + "    -<Pubchem ID 1>\n"
            + "      -coords.pdb\n"
            + "    -<Pubchem ID 2>\n"
            + "      -coords.pdb\n"
            + "  -<Uniprot AC 2>\n"
            + "    -<Pubchem ID 1>\n"
            + "      -coords.pdb\n"
            + "- Manually curated table with annotations referring to the\n"
            + "  structures in the structure directories."
        )
        parser = argparse.ArgumentParser(
            description=desc, formatter_class=RawTextHelpFormatter
        )
        parser.add_argument(
            "--myctu_catalogue",
            type=str,
            default="WHO-UCN-GTB-PCI-2021.7-eng.xlsx",
            help="Path to Catalogue of mutations in Mycobacterium "
            "tuberculosis complex and their association with drug "
            "resistance.",
        )
        parser.add_argument(
            "--who_reference_proteome",
            type=str,
            default="Mycobacterium_tuberculosis_H37Rv_proteins_v4.fasta",
            help="Reference proteome in fasta format to which the "
            "catalogue refers",
        )
        parser.add_argument(
            "--uniprot_reference_proteome",
            type=str,
            default="uniprot-proteome_UP000001584.fasta",
            help="Proteome to which the variants should finally be "
            "mapped. Sequence correspondence is given in "
            "--mapping_data",
        )
        parser.add_argument(
            "--mapping_data",
            type=str,
            default="mapping_data.json",
            help="mapping as defined in the general description. "
            "Controls the number of data files that are generated.",
        )
        parser.add_argument(
            "--structure_dir",
            type=str,
            default="structures",
            help="Structures that are organized in the described "
            "directory structure",
        )
        parser.add_argument(
            "--structure_data",
            type=str,
            default="tbvar3d_targets_desc.csv",
            help="Annotation data in csv format",
        )
        parser.add_argument(
            "--out_dir",
            type=str,
            default="data",
            help="Data jsons as input for annotation pipeline will "
            "be dumped here",
        )
    
        args = parser.parse_args()
    
        if not os.path.exists(args.myctu_catalogue):
            raise RuntimeError("myctu_catalogue file does not exist")
        if not os.path.exists(args.who_reference_proteome):
            raise RuntimeError("WHO reference proteome file does not exist")
        if not os.path.exists(args.uniprot_reference_proteome):
            raise RuntimeError("Uniprot reference proteome file does not exist")
        if not os.path.exists(args.mapping_data):
            raise RuntimeError("mapping_data file does not exist")
        if not os.path.exists(args.structure_dir):
            raise RuntimeError("structure_dir does not exist")
        if not os.path.exists(args.structure_data):
            raise RuntimeError("structure_data does not exist")
    
        if not os.path.exists(args.out_dir):
            os.makedirs(args.out_dir)
    
        return args
    
    
    def main():
    
        args = _parse_args()
    
        logger = Logger()
        ost.PushLogSink(logger)
        ost.PushVerbosityLevel(ost.LogLevel.Info)
    
        uniprot_reference_sequences = SequenceContainer(
            args.uniprot_reference_proteome
        )
        who_reference_sequences = SequenceContainer(args.who_reference_proteome)
        with open(args.mapping_data) as fh:
            mapping_data = json.load(fh)
        who_var_importer = WHOVarImporter(
            args.myctu_catalogue, mapping_data["drugtlc_pubchem_mapping"]
        )
        var_import_pipeline = pipeline.DataImportPipeline(
            var_importer=[who_var_importer]
        )
        who_struct_importer = WHOStructImporter(
            uniprot_reference_sequences, args.structure_dir, args.structure_data
        )
        struct_import_pipeline = pipeline.DataImportPipeline(
            struct_importer=[who_struct_importer]
        )
    
        # mechanism information comes from structure_data table
        uniprot_mechanism_mapping = dict()
        df = pd.read_csv(args.structure_data)
        for uniprot_ac, mechanism in zip(df["prot_uniprotid"], df["mechanism"]):
            uniprot_mechanism_mapping[uniprot_ac] = mechanism
    
        data_containers = dict()
    
        for uniprot_ac, gene_locus in mapping_data[
            "uniprot_mycobrowser_mapping"
        ].items():
            uniprot_sequence = uniprot_reference_sequences.Get(uniprot_ac)
            who_sequence = who_reference_sequences.Get(gene_locus)
    
            if uniprot_ac in uniprot_mechanism_mapping:
                mechanism = uniprot_mechanism_mapping[uniprot_ac]
            else:
                ost.LogInfo(
                    f"No mechanism info for Uniprot AC {uniprot_ac}. "
                    f'set to "unclear"'
                )
                mechanism = "unclear"
    
            variant_data = ImportVariants(
                uniprot_sequence, who_sequence, var_import_pipeline, mechanism
            )
            structure_data = ImportStructures(
                uniprot_sequence, struct_import_pipeline
            )
    
            # merge structures into variant_data
            for struct_idx in range(structure_data.GetNumStructures()):
                variant_data.AddStructure(structure_data.GetStructure(struct_idx))
    
            with open(
                os.path.join(args.out_dir, uniprot_ac + "_data.json"), "w"
            ) as fh:
                json.dump(variant_data.ToJSON(), fh)
    
    
    if __name__ == "__main__":
        main()