Skip to content
Snippets Groups Projects
main.py 16.7 KiB
Newer Older
Larissa Glass's avatar
Larissa Glass committed
"""Sample transcripts."""

import logging

import numpy as np
import pandas as pd
from tqdm import tqdm


LOG = logging.getLogger(__name__)


def read_abundances(transcripts_file: str) -> pd.DataFrame:
    """Read transcript-abundance file into dataframe.
Larissa Glass's avatar
Larissa Glass committed
        transcripts_file: Input filename
Larissa Glass's avatar
Larissa Glass committed
        A pd.DataFrame with the transcript abundances ("id", "count").
Andri Fraenkl's avatar
Andri Fraenkl committed

Larissa Glass's avatar
Larissa Glass committed
    Raises:
Andri Fraenkl's avatar
Andri Fraenkl committed
        ValueError: When the input file is neither csv or tsv
    """
    cols: list = ["id", "count"]
    if transcripts_file.endswith(".tsv"):
        return pd.read_table(transcripts_file, header=None, names=cols)
Larissa Glass's avatar
Larissa Glass committed
    if transcripts_file.endswith(".csv"):
        return pd.read_csv(transcripts_file, header=None, names=cols)
Larissa Glass's avatar
Larissa Glass committed
    raise ValueError("File type needs to be either csv or tsv")
Larissa Glass's avatar
Larissa Glass committed
def filter_df(df: pd.DataFrame, transcripts: list = None) -> pd.DataFrame:
    """Filter annotations to include only exons with the highest transcript support level, i.e. TSL1.
Larissa Glass's avatar
Larissa Glass committed

    `feature` column is filtered on value "exon" and
    `free_text` column is filtered to include the string denoting the highest transcript support level
    ('transcript_support_level "1"').

    If a list of transcript IDs is given, `free_text` column is filtered to include one of the IDs.

    Args:
        df: A pd.DataFrame containing an unparsed gtf-file
Larissa Glass's avatar
Larissa Glass committed
        transcript: list of transcript IDs

    Returns:
        A pd.DataFrame containing only rows with exon annotations of highest transcript support level and,
        if provided, belonging to one of the given transcripts
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
Larissa Glass's avatar
Larissa Glass committed
    if transcripts is None:
        transcripts = []
    df_filter = df[
        (df["feature"] == "exon")
        & (df["free_text"].str.contains('transcript_support_level "1"'))
    ]
    if len(transcripts) > 0:
Larissa Glass's avatar
Larissa Glass committed
        df_filter = df_filter["free_text"].str.contains(
            "|".join(transcripts), regex=True
        )

    return df_filter


def str_to_dict(s: str) -> dict:
Larissa Glass's avatar
Larissa Glass committed
    """Split between key/value pairs.

Larissa Glass's avatar
Larissa Glass committed
    Split string based on delimiter ';' into items, remove empty items and split items on delimiter ' ' into
    key/value pairs. Remove quotes from value strings and create a dictionary.
Andri Fraenkl's avatar
Andri Fraenkl committed

Larissa Glass's avatar
Larissa Glass committed
    Args:
        s: A string of the form 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'
Andri Fraenkl's avatar
Andri Fraenkl committed

    Returns:
Larissa Glass's avatar
Larissa Glass committed
        A dictionary containing e.g. {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
Larissa Glass's avatar
Larissa Glass committed
    # split into items
    # remove empty items
    # split items into key/value pairs
    item_list: list = [x.split() for x in s.split(";") if len(x) > 0]
    # remove quotes for values and return dictionary
    return {item[0]: item[1].strip('"') for item in item_list}


def dict_to_str(d: dict) -> str:
Larissa Glass's avatar
Larissa Glass committed
    """Parse dictionary in gtf free_text column format.

    Takes e.g. dictionary {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} and returns
    string 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'.
    Key/value pairs are joined by space to form an item and items are joinded by ';' to form a string.
    If a value is Not a Number (nan), the key/value pair is omitted from the string.
Andri Fraenkl's avatar
Andri Fraenkl committed

Larissa Glass's avatar
Larissa Glass committed
    Args:
        d: A dictionary of the form {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'}
Andri Fraenkl's avatar
Andri Fraenkl committed

Larissa Glass's avatar
Larissa Glass committed
    Returns:
        A string, e.g. 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'.
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
    # join key, value pairs from dictionary with a space in a list,
    # then join items in list by ;
    # end on ;
Larissa Glass's avatar
Larissa Glass committed
    # value == value checks that value is not nan
    s: str = (
        "; ".join([f'{key} "{value}"' for key, value in d.items() if value == value])
        + ";"
    )
    return s


def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame:
Larissa Glass's avatar
Larissa Glass committed
    """Reverse parsing of gtf based pd.DataFrame to include only columns that are well defnined by gtf-file standards.
Larissa Glass's avatar
Larissa Glass committed

Larissa Glass's avatar
Larissa Glass committed
    The first 8 defined columns are constant as defined by gtf-file standards.
    Further columns are assumed to be parsed free-text columns (see Gtf.parse_free_text()).
    The parsed free-text columns are aggregated as a dictionary and the dictionry is parsed as a string in gtf format.
Larissa Glass's avatar
Larissa Glass committed

Andri Fraenkl's avatar
Andri Fraenkl committed
    Args:
Larissa Glass's avatar
Larissa Glass committed
        df_all: A pd.DataFrame containing a parsed gtf-file.
Larissa Glass's avatar
Larissa Glass committed
    Returns:
Larissa Glass's avatar
Larissa Glass committed
        A pd.DataFrame with the columns as defined by gtf-file standards.
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
    # Define pd.DataFrame containing only parsed free-text columns
    df_free_text = df_all.iloc[:, 8:]
Larissa Glass's avatar
Larissa Glass committed
    # Define pd.DataFrame containing only non-parsed columns
    df = df_all.iloc[:, :8]
    # Reverse parsing of free-text columns and add the result as column `free_text` to output pd.DataFrame
    df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(dict_to_str)
    return df


def write_gtf(df: pd.DataFrame, filename: str) -> None:
Larissa Glass's avatar
Larissa Glass committed
    """Save a Gtf object to file in gtf format.
Larissa Glass's avatar
Larissa Glass committed
    Makes sure data types are correct and saves object in gtf format.
Larissa Glass's avatar
Larissa Glass committed
    Args:
Larissa Glass's avatar
Larissa Glass committed
        df: A pd.DataFrame containing a gtf-file.
        filename: File to save to.
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
    # Make sure the data types are correct.
    df = df.astype(Gtf.dtypes)

    df.to_csv(
        filename,
        sep="\t",
        header=False,
        index=False,
        quoting=None,
        quotechar="'",
        mode="a",
    )


def write_header(annotations_file: str) -> None:
Larissa Glass's avatar
Larissa Glass committed
    """Write the header of an annotations file, consisting of the tab delimited column names.
Andri Fraenkl's avatar
Andri Fraenkl committed

    Args:
Larissa Glass's avatar
Larissa Glass committed
        annotations_file: Filename to write header to.
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
Larissa Glass's avatar
Larissa Glass committed
    with open(annotations_file, "w", encoding="utf_8") as fh:
        fh.write("\t".join(Gtf.dtypes.keys()) + "\n")


class Gtf:
Larissa Glass's avatar
Larissa Glass committed
    """Class to read transcripts annotations file into a Gtf object.
Larissa Glass's avatar
Larissa Glass committed
        dtypes: A dictionary containing column names and respective data types.
        parsed: A boolean indicating if the pd.DataFrame is parsed.
        original_columns: A list of columns not touched by parsing.
        free_text_columns: A list of columns created during parsing of column `free_text`.
    """

    dtypes = {
        "seqname": object,
        "source": object,
        "feature": object,
        "start": int,
        "end": int,
        "score": object,
        "strand": object,
        "frame": object,
        "free_text": object,
    }

    def __init__(self):
Larissa Glass's avatar
Larissa Glass committed
        """Initialize Gtf object."""
Larissa Glass's avatar
Larissa Glass committed
        self.df = None
        self.parsed = False
        self.original_columns = list(self.dtypes.keys())
        self.free_text_columns = []

    def read_file(self, annotations_file: str) -> None:
Larissa Glass's avatar
Larissa Glass committed
        """Read gtf-file.
Larissa Glass's avatar
Larissa Glass committed
        Iterate over chunks of the gtf-file reading 100000 rows at a time. Filter chunks for exon annotations of
Larissa Glass's avatar
Larissa Glass committed
        the highest transcript support level. Concatenate chunks to get resulting pd.DataFrame.
Larissa Glass's avatar
Larissa Glass committed
        Args:
            annotations_file: Filename of annotations.
Andri Fraenkl's avatar
Andri Fraenkl committed

        Raises:
Larissa Glass's avatar
Larissa Glass committed
            ValueError: The file type is required to be gtf.
Andri Fraenkl's avatar
Andri Fraenkl committed
        """
        if not annotations_file.endswith("gtf"):
            raise ValueError("File type needs to be gtf")

        reader = pd.read_table(
            annotations_file,
            sep="\t",
            comment="#",
            names=self.dtypes.keys(),
            dtype=self.dtypes,
            chunksize=100000,
            iterator=True,
        )
        self.df = pd.concat([filter_df(chunk) for chunk in reader])

    def from_dataframe(self, df: pd.DataFrame) -> None:
Larissa Glass's avatar
Larissa Glass committed
        """Initialize Gtf object from pandas Dataframe.

        Part of initialization is:
        Set dataframe attribute
Larissa Glass's avatar
Larissa Glass committed
        Check which columns belong to the free-text part of the gtf-file.
        Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE.

        Args:
Larissa Glass's avatar
Larissa Glass committed
            df: A pd.DataFrame containing a gtf-file.
        self.free_text_columns = [
            col for col in df.columns if col not in self.original_columns
        ]
        self.df = df
Larissa Glass's avatar
Larissa Glass committed
        if "free_text" not in df.columns:
            self.parsed = True

Larissa Glass's avatar
Larissa Glass committed
    def parse_key_value(self):
Larissa Glass's avatar
Larissa Glass committed
        """Parse key/value pairs from `free_text` column into column `key` with row entry `value`.
        Creates a dataframe with columns for keys in the free-text column instead of `free_text` column.
Larissa Glass's avatar
Larissa Glass committed
        Saves it to Gtf.df attribute.
        """
Larissa Glass's avatar
Larissa Glass committed
        assert self.parsed is False
        # create dataframe with columns for values in free_text column
        df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series)
        # remember which columns come from free_text
        self.free_text_columns = df_free_text.columns
        # join free_text columns to original dataframe and drop the "free_text" column itself
        self.df = self.df.drop("free_text", axis=1)
        self.original_columns = self.df.columns
        self.df = self.df.join(df_free_text, how="inner")
        # remember that current dataframe is parsed, i.e. can't be written in gtf format
        self.parsed = True

    def reverse_parse_free_text(self):
Larissa Glass's avatar
Larissa Glass committed
        """Reverses parsing of `free_text` column.
        Creates a data frame that can be written in gtf format to file. Parsed free-text columns are aggregated
Larissa Glass's avatar
Larissa Glass committed
        into `free_text` column according to gtf format specification.
        """
Larissa Glass's avatar
Larissa Glass committed
        assert self.parsed is True
        # create dataframe with only free_text columns
        df_free_text = self.df[self.free_text_columns]
        # filter current dataframe to only original columns, except "free_text" column
        self.df = self.df[self.original_columns]
        # undo parsing and save result in "free_text" column
        self.df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(
            dict_to_str
        )
        # remember that current dataframe is not parsed
        self.parsed = False

    def pick_transcript(self, transcript_id: str) -> pd.DataFrame:
Larissa Glass's avatar
Larissa Glass committed
        """Filter annotations to a given transcript ID."""
        return self.df.query(f"transcript_id == '{transcript_id}'")


class TranscriptGenerator:
Larissa Glass's avatar
Larissa Glass committed
    """Class to sample a transcript."""

Larissa Glass's avatar
Larissa Glass committed
    def __new__(
        cls,
        transcript_id: str,
        transcript_count: int,
        transcript_df: pd.DataFrame,
        prob_inclusion: float,
    ):
        """Initialize TranscriptGenerator object."""
        strands = transcript_df["strand"].unique()
        if len(transcript_df) == 0:
            LOG.warning(
                "Transcript %s can't be sampled. Annotation is missing", transcript_id
            )
            instance = None
        elif len(strands) > 1:
            LOG.warning(
                "Transcript %s can't be sampled. Transcript generator is not implemented for transcripts with"
                " exons annotated on different strands",
                transcript_id,
            )
            instance = None
        else:
            instance = super().__new__(cls)

        return instance

    def __init__(
        self,
        transcript_id: str,
        transcript_count: int,
        transcript_df: pd.DataFrame,
        prob_inclusion: float,
    ):
Larissa Glass's avatar
Larissa Glass committed
        """Initialize TranscriptGenerator object."""
        self.id = transcript_id
        self.count = transcript_count
        self.df = transcript_df
        self.no_exons = len(transcript_df)
        self.strand = self.df["strand"].unique().item()
        self.prob_inclusion = prob_inclusion

    def _get_inclusions(self) -> np.array:
Larissa Glass's avatar
Larissa Glass committed
        """Generate inclusions array.

        Each column corresponds to one sample and the number of columns corresponds to the number of samples.
Larissa Glass's avatar
Larissa Glass committed
            A boolean np.array, where True means intron inclusion.
        """
        inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion
        if self.strand == "+":
            inclusion_arr[-1, :] = False
        elif self.strand == "-":
            inclusion_arr[-1, :] = False

        return inclusion_arr

    def _get_unique_inclusions(self) -> tuple[list, np.array, np.array]:
Andri Fraenkl's avatar
Andri Fraenkl committed
        """Inclusion of unique intron inclusion via arrays and counts and name generation of each unique count.
Larissa Glass's avatar
Larissa Glass committed

Andri Fraenkl's avatar
Andri Fraenkl committed
        Args:

        Returns:
Larissa Glass's avatar
Larissa Glass committed
            - List of names for generated exons.
            - A boolean np.array where columns correspond to generated transcripts and rows to intron inclusion.
            - A np.array containing sample number per generated inclusions, i.e. transcript.
Andri Fraenkl's avatar
Andri Fraenkl committed
        """
        inclusion_arr = self._get_inclusions()
        # Unique intron inclusion arrays and counts
        inclusion_arr_unique, counts = np.unique(
            inclusion_arr, axis=1, return_counts=True
        )
        # Name for each generated transcript
        names = []
        for i in range(inclusion_arr_unique.shape[1]):
Larissa Glass's avatar
Larissa Glass committed
            if np.all(inclusion_arr_unique[:, i] is False, axis=0):
                names.append(self.id)
            else:
                names.append(f"{self.id}_{i}")

        return names, inclusion_arr_unique, counts

    def _get_df(self, inclusions: np.array, transcript_id: str) -> pd.DataFrame:
        """Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions.

        Args:
Larissa Glass's avatar
Larissa Glass committed
            inclusions: A boolean vector denoting intron inclusion.
            transcript_id: The transcript id.
Larissa Glass's avatar
Larissa Glass committed
            The generated transcript as a pd.DataFrame.
        """
        df_generated = self.df.copy()
        if self.strand == "+":
Larissa Glass's avatar
Larissa Glass committed
            original_end = df_generated["end"]
            df_generated["end"] = np.where(
                inclusions,
                df_generated["start"].shift(periods=-1, fill_value=-1) - 1,
Larissa Glass's avatar
Larissa Glass committed
                original_end,
            )
        if self.strand == "-":
Larissa Glass's avatar
Larissa Glass committed
            original_start = df_generated["start"]
            df_generated["start"] = np.where(
                inclusions,
                df_generated["end"].shift(periods=-1, fill_value=-1) + 1,
Larissa Glass's avatar
Larissa Glass committed
                original_start,
            )

        original_id = df_generated["exon_id"]
        df_generated["exon_id"] = np.where(
            inclusions,
            df_generated["exon_id"] + "_" + np.arange(len(df_generated)).astype(str),
            original_id,
        )

        df_generated["transcript_id"] = transcript_id
        return df_generated

Larissa Glass's avatar
Larissa Glass committed
    def write_sequences(self, filename: str) -> None:
        """Write transcripts to file.

        Args:
Larissa Glass's avatar
Larissa Glass committed
            filename: Output csv filename.
Larissa Glass's avatar
Larissa Glass committed
        ids, _, counts = self._get_unique_inclusions()
        with open(filename, "a", encoding="utf_8") as fh:
            for transcript_id, transcript_count in zip(ids, counts):
                fh.write(f"{transcript_id},{self.id},{transcript_count}\n")
Larissa Glass's avatar
Larissa Glass committed
    def write_annotations(self, filename: str) -> None:
Larissa Glass's avatar
Larissa Glass committed
        """Generate a annotations in gtf format for sampled transcript.

Andri Fraenkl's avatar
Andri Fraenkl committed
        Args:
Larissa Glass's avatar
Larissa Glass committed
            Filename: Output gtf-filename.
Larissa Glass's avatar
Larissa Glass committed

Andri Fraenkl's avatar
Andri Fraenkl committed
        Raises:
Larissa Glass's avatar
Larissa Glass committed
            ValueError: If given transcript ID could not be sampled.
Andri Fraenkl's avatar
Andri Fraenkl committed
        """
Larissa Glass's avatar
Larissa Glass committed
        ids, inclusions, _ = self._get_unique_inclusions()
        n_unique = len(ids)

Larissa Glass's avatar
Larissa Glass committed
        df = pd.concat(
            [self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)]
        )
        df = reverse_parse_free_text(df)
Larissa Glass's avatar
Larissa Glass committed
        write_gtf(df, filename)
        LOG.debug("Transcript %s sampled", self.id)
def sample_transcripts(
    input_transcripts_file: str,
    input_annotations_file: str,
    prob_inclusion: float,
    output_transcripts_file: str,
    output_annotations_file: str,
):
Larissa Glass's avatar
Larissa Glass committed
    """Read input files, iterate over transcript IDs, sample each transcript and save results.

    Args:
Larissa Glass's avatar
Larissa Glass committed
        input_transcripts_file: Filename of transcript abundances, needs to be csv or tsv.
        input_annotations_file: Filename of annotations, needs to be gtf.
        prob_inclusion: Probability of intron inclusion, needs to be float in range [0,1].
        output_transcripts_file: Filename of file to write sampled transcripts to.
        output_annotations_file: Filename of file to write generated annotations to.
Larissa Glass's avatar
Larissa Glass committed
    """
Larissa Glass's avatar
Larissa Glass committed
    LOG.info("Probability of intron inclusion: %s", str(prob_inclusion))
    LOG.info("Parsing transcript abundances...")
    transcripts = read_abundances(input_transcripts_file)
Larissa Glass's avatar
Larissa Glass committed
    # Make sure that abundance is positive
    transcripts = transcripts[transcripts["count"] > 0]
    LOG.info("Done parsing...")
    LOG.info("Parsing annotations...")
    annotations = Gtf()
    annotations.read_file(input_annotations_file)
Larissa Glass's avatar
Larissa Glass committed
    annotations.parse_key_value()
    LOG.info("Done parsing...")
    LOG.info("Start sampling transcripts...")
    # Set up output file, write header once and append data in loop
    write_header(output_annotations_file)

    for _, row in tqdm(transcripts.iterrows()):
        transcript_id = row["id"]
        transcript_count = row["count"]

        transcript_df = annotations.pick_transcript(transcript_id)
        transcripts = TranscriptGenerator(
            transcript_id,
            transcript_count,
            transcript_df,
            prob_inclusion=prob_inclusion,
        )
Larissa Glass's avatar
Larissa Glass committed
        try:
            transcripts.write_annotations(output_annotations_file)
            transcripts.write_sequences(output_transcripts_file)
        except AttributeError:
            pass
    LOG.info("Done.")