Skip to content
Snippets Groups Projects
main.py 14.7 KiB
Newer Older
import logging

import numpy as np
import pandas as pd
from tqdm import tqdm


LOG = logging.getLogger(__name__)


def read_abundances(transcripts_file: str) -> pd.DataFrame:
    """Read abundance file into dataframe

    Args:
        transcripts_file (str): Input filename

    Returns:
        pd.DataFrame: Transcript abundances ("id", "count")
Andri Fraenkl's avatar
Andri Fraenkl committed

    Raises: 
        ValueError: When the input file is neither csv or tsv
    """
    cols: list = ["id", "count"]
    if transcripts_file.endswith(".tsv"):
        return pd.read_table(transcripts_file, header=None, names=cols)
    elif transcripts_file.endswith(".csv"):
        return pd.read_csv(transcripts_file, header=None, names=cols)
    else:
        raise ValueError("File type needs to be either csv or tsv")


def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame:
Andri Fraenkl's avatar
Andri Fraenkl committed
    """Filters inclusions of exons and the highest transcript support level (TSL1)
    
    Data is filtered from the pd.DataFrame to include the string entery of "exons", based on the number of transcripts
    it will choose the transcript with the highest transcript support level (TSL1-5). It will filter a list of transcript
    IDs if that is given as an input.

    Args: 
        df: pd.DataFrame, transcript: list
    
    Returns: 
        df_filter: filter strings from pd.DataFrame ("exons", "transcript_support_level "1"")
    """
    # Filter annotations to exon and highest transcript support level.
    # If list of transcript ids is given, filter for that as well.
    df_filter = df[
        (df["feature"] == "exon")
        & (df["free_text"].str.contains('transcript_support_level "1"'))
    ]
    if len(transcripts) > 0:
        df_filter = df_filter.str.contains("|".join(transcripts), regex=True)

    return df_filter


def str_to_dict(s: str) -> dict:
Andri Fraenkl's avatar
Andri Fraenkl committed
    """Split between key/value pairs

    Creates a dictionary based on the split between key and value pairs from the item_list
    Also removes quotes values, empty list items and then returns the dictionary
    
    Args: 
        s: str
Andri Fraenkl's avatar
Andri Fraenkl committed

    Returns:
        items in item_list
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
    # split between key/value pairs
    # remove empty list items and split key, value pairs
    item_list: list = [x.split() for x in s.split(";") if len(x) > 0]
    # remove quotes for values and return dictionary
    return {item[0]: item[1].strip('"') for item in item_list}


def dict_to_str(d: dict) -> str:
Andri Fraenkl's avatar
Andri Fraenkl committed
    """Joins a key/value pair with a space in a list

    Takes key/value pairs from a dictionary and joins them with a space on a list
    Joins items from said list that are marked with ; and end with ;
    Checks if the value is Not a Number (nan)
    
    Args: 
        "key", "value" (str)

    Returns: 
        s: str
Andri Fraenkl's avatar
Andri Fraenkl committed
    """
    # join key, value pairs from dictionary with a space in a list,
    # then join items in list by ;
    # end on ;
    # check if value is nan
    s: str = (
        "; ".join([f'{key} "{value}"' for key, value in d.items() if value == value])
        + ";"
    )
    return s


def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame:
Andri Fraenkl's avatar
Andri Fraenkl committed
    """Creates columns that are well defnined by .gtf file standards
    
    The first 8 defined columns are constant as defined by gtf file standards
    Further columns are assumed to be free text columns and superflous
    
    Args:
        df_all: DataFrame
Andri Fraenkl's avatar
Andri Fraenkl committed

    Returns: 
        DataFrame with 8 columns as defined by gtf file standards
    """
    # the first 8 columns should be constant according to gtf file standard
    # we assume that further columns are parsed free text columns
    df_free_text = df_all.iloc[:, 8:]

    df = df_all.iloc[:, :8]
    df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(dict_to_str)
    return df


def write_gtf(df: pd.DataFrame, filename: str) -> None:
Andri Fraenkl's avatar
Andri Fraenkl committed
    """Checks all data types in the pd.DataFrame

    Goes through the updated pd.DataFrame after formatting to gtf file standards
    and checks if the data types have been formatted correctly. 
    
    Args: 
        Types ("filename", "sep", "header", "index", "quoting", "quotechar", "mode")
        Filename: str

    Returns: 
        DataFrame defined correctly via gtf.dtypes
    """
    # Make sure the data types are correct.
    df = df.astype(Gtf.dtypes)

    df.to_csv(
        filename,
        sep="\t",
        header=False,
        index=False,
        quoting=None,
        quotechar="'",
        mode="a",
    )


def write_header(annotations_file: str) -> None:
Andri Fraenkl's avatar
Andri Fraenkl committed
    """Opens up an annotation file with the datatypes defined as correct

    Args:
    
    Returns:

    """
    with open(annotations_file, "w") as fh:
        fh.write("\t".join(Gtf.dtypes.keys()) + "\n")


class Gtf:
    """Class to read transcripts annotations file and parse it into a pandas Dataframe.

    Args:
        annotations_file: Path to gtf file.

    Attributes:
        annotations_file: File with transcript annotation of the genome

    """

    dtypes = {
        "seqname": object,
        "source": object,
        "feature": object,
        "start": int,
        "end": int,
        "score": object,
        "strand": object,
        "frame": object,
        "free_text": object,
    }

    def __init__(self):
        self.parsed = False
        self.original_columns = list(self.dtypes.keys())
        self.free_text_columns = []

    def read_file(self, annotations_file: str) -> None:
Andri Fraenkl's avatar
Andri Fraenkl committed
        """Defines a limit for larger input Files. Iterates lines and Filters on bool.
        
        If the chuncksize of the inputed annotation file is larger than 100000 it will
        iterate over the lines and filters before saving.

        Args:
            Path to annotations_file: str
Andri Fraenkl's avatar
Andri Fraenkl committed

        Returns:
            If the file chunk is over a certain size it will reiterate the lines and files.

        Raises:
            ValueError: The file type is required to be .gtf
        """
        # for large annotation files, iterate over lines and filter before saving to dataframe
        if not annotations_file.endswith("gtf"):
            raise ValueError("File type needs to be gtf")

        reader = pd.read_table(
            annotations_file,
            sep="\t",
            comment="#",
            names=self.dtypes.keys(),
            dtype=self.dtypes,
            chunksize=100000,
            iterator=True,
        )
        self.df = pd.concat([filter_df(chunk) for chunk in reader])

    def from_dataframe(self, df: pd.DataFrame) -> None:
        """ Initializes Gtf object from pandas Dataframe. 
        Part of initialization is: 
        Set dataframe attribute 
        Check which columns belong to the free-text part of the GTF-file.
        Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE.

        Args:
            df: DataFrame
        
        Returns:
            None     
        """
        self.free_text_columns = [
            col for col in df.columns if col not in self.original_columns
        ]
        self.df = df
        if not "free_text" in df.columns:
            self.parsed = True

    def parse_free_text(self):
Andri Fraenkl's avatar
Andri Fraenkl committed
        """Creates a self DataFrame with columns for parsed free text

        Creates a dataframe with columns for values in the free text column and then joins
        the free_text column to orginal dataframe. Drops the free_text column itself.
        
        Args:

        Returns:
            Parsed DataFrame with free_text column joined with orginal and dropped. 
         """
        assert self.parsed == False
        # create dataframe with columns for values in free_text column
        df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series)
        # remember which columns come from free_text
        self.free_text_columns = df_free_text.columns
        # join free_text columns to original dataframe and drop the "free_text" column itself
        self.df = self.df.drop("free_text", axis=1)
        self.original_columns = self.df.columns
        self.df = self.df.join(df_free_text, how="inner")
        # remember that current dataframe is parsed, i.e. can't be written in gtf format
        self.parsed = True

    def reverse_parse_free_text(self):
        """Creates a reversed self DataFrame with columns for non parsed free text
        Creates a data frame with only free_text columns and then filters current dataframe down
        to only orginal_columns leaving the free_text column untouched. The parsing is undone and the results 
        saved in the free_text column and defined as non parsed.
Andri Fraenkl's avatar
Andri Fraenkl committed
        
        Args:

        Returns:
         """
        assert self.parsed == True
        # create dataframe with only free_text columns
        df_free_text = self.df[self.free_text_columns]
        # filter current dataframe to only original columns, except "free_text" column
        self.df = self.df[self.original_columns]
        # undo parsing and save result in "free_text" column
        self.df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(
            dict_to_str
        )
        # remember that current dataframe is not parsed
        self.parsed = False

    def pick_transcript(self, transcript_id: str) -> pd.DataFrame:
        return self.df.query(f"transcript_id == '{transcript_id}'")


class TranscriptGenerator:
    def __init__(
        self,
        transcript_id: str,
        transcript_count: int,
        transcript_df: pd.DataFrame,
        prob_inclusion: float,
    ):
        assert len(transcript_df) > 0
        assert transcript_count > 0
        assert (prob_inclusion >= 0) and (prob_inclusion <= 1)

        self.id = transcript_id
        self.count = transcript_count
        self.df = transcript_df
        self.no_exons = len(transcript_df)
        self.strand = self.df["strand"].unique().item()
        self.prob_inclusion = prob_inclusion

    def _get_inclusions(self) -> np.array:
        """Generate inclusions array where each column corresponds to one sample and the number of columns corresponds to the number of samples.

        Returns:
            np.array: inclusions, where True means intron inclusion
        """
        inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion
        if self.strand == "+":
            inclusion_arr[-1, :] = False
        elif self.strand == "-":
            inclusion_arr[-1, :] = False

        return inclusion_arr

    def _get_unique_inclusions(self) -> tuple[list, np.array, np.array]:
Andri Fraenkl's avatar
Andri Fraenkl committed
        """Inclusion of unique intron inclusion via arrays and counts and name generation of each unique count.
        
        Args:

        Returns:
            Bool: If true include unique intron array and count and create unique name and count.
        """
        inclusion_arr = self._get_inclusions()
        # Unique intron inclusion arrays and counts
        inclusion_arr_unique, counts = np.unique(
            inclusion_arr, axis=1, return_counts=True
        )
        # Name for each generated transcript
        names = []
        for i in range(inclusion_arr_unique.shape[1]):
            if np.all(inclusion_arr_unique[:, i] == False, axis=0):
                names.append(self.id)
            else:
                names.append(f"{self.id}_{i}")

        return names, inclusion_arr_unique, counts

    def _get_df(self, inclusions: np.array, transcript_id: str) -> pd.DataFrame:
        """Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions.

        Args:
            inclusions (np.array): boolean vector denoting intron inclusion
            transcript_id (str): transcript id

        Returns:
            pd.DataFrame: Derived dataframe
        """
        df_generated = self.df.copy()
        if self.strand == "+":
            origninal_end = df_generated["end"]
            df_generated["end"] = np.where(
                inclusions,
                df_generated["start"].shift(periods=-1, fill_value=-1) - 1,
                origninal_end,
            )
        if self.strand == "-":
            origninal_start = df_generated["start"]
            df_generated["start"] = np.where(
                inclusions,
                df_generated["end"].shift(periods=-1, fill_value=-1) + 1,
                origninal_start,
            )

        original_id = df_generated["exon_id"]
        df_generated["exon_id"] = np.where(
            inclusions,
            df_generated["exon_id"] + "_" + np.arange(len(df_generated)).astype(str),
            original_id,
        )

        df_generated["transcript_id"] = transcript_id
        return df_generated

    def generate_transcripts(self, filename: str) -> None:
        """Write transcripts to file.

        Args:
            filename (str): Output csv filename
        """
        ids, inclusions, counts = self._get_unique_inclusions()
        with open(filename, "a") as fh:
            for transcript_id, transcript_count in zip(ids, counts):
                fh.write(f"{transcript_id},{self.id},{transcript_count}\n")

    def generate_annotations(self, filename: str) -> None:
Andri Fraenkl's avatar
Andri Fraenkl committed
        """Generates a gtf file including IDs, inclusion, and counts from reverse parse free text
        
        Args:
            Filename: str
        
        Returns:
            Gtf file with filename
        
        Raises:
            ValueError: If self.ID could not be sampled (No ID generated for the inclusion transcript) 
        """
        ids, inclusions, counts = self._get_unique_inclusions()
        n_unique = len(ids)

        try:
            df = pd.concat(
                [self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)]
            )
            df = reverse_parse_free_text(df)

            write_gtf(df, filename)
            LOG.debug(f"Transcript {self.id} sampled")
        except ValueError:
            LOG.error(f"Transcript {self.id} could not be sampled.")


def sample_transcripts(
    input_transcripts_file: str,
    input_annotations_file: str,
    prob_inclusion: float,
    output_transcripts_file: str,
    output_annotations_file: str,
):
    transcripts = read_abundances(input_transcripts_file)

    LOG.info("Parsing annotations...")
    annotations = Gtf()
    annotations.read_file(input_annotations_file)
    annotations.parse_free_text()
    LOG.info("Done parsing...")
    LOG.info("Start sampling transcripts...")
    # Set up output file, write header once and append data in loop
    write_header(output_annotations_file)

    for _, row in tqdm(transcripts.iterrows()):
        transcript_id = row["id"]
        transcript_count = row["count"]

        transcript_df = annotations.pick_transcript(transcript_id)
        transcripts = TranscriptGenerator(
            transcript_id,
            transcript_count,
            transcript_df,
            prob_inclusion=prob_inclusion,
        )
        transcripts.generate_annotations(output_annotations_file)
        transcripts.generate_transcripts(output_transcripts_file)
    LOG.info("Done.")