import logging import numpy as np import pandas as pd from tqdm import tqdm LOG = logging.getLogger(__name__) def read_abundances(transcripts_file: str) -> pd.DataFrame: """Read abundance file into dataframe Args: transcripts_file (str): Input filename Returns: pd.DataFrame: Transcript abundances ("id", "count") Raises: ValueError: When the input file is neither csv or tsv """ cols: list = ["id", "count"] if transcripts_file.endswith(".tsv"): return pd.read_table(transcripts_file, header=None, names=cols) elif transcripts_file.endswith(".csv"): return pd.read_csv(transcripts_file, header=None, names=cols) else: raise ValueError("File type needs to be either csv or tsv") def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame: """Filters inclusions of exons and the highest transcript support level (TSL1) Data is filtered from the pd.DataFrame to include the string entery of "exons", based on the number of transcripts it will choose the transcript with the highest transcript support level (TSL1-5). It will filter a list of transcript IDs if that is given as an input. Args: df: pd.DataFrame, transcript: list Returns: df_filter: filter strings from pd.DataFrame ("exons", "transcript_support_level "1"") """ # Filter annotations to exon and highest transcript support level. # If list of transcript ids is given, filter for that as well. df_filter = df[ (df["feature"] == "exon") & (df["free_text"].str.contains('transcript_support_level "1"')) ] if len(transcripts) > 0: df_filter = df_filter.str.contains("|".join(transcripts), regex=True) return df_filter def str_to_dict(s: str) -> dict: """Split between key/value pairs Creates a dictionary based on the split between key and value pairs from the item_list Also removes quotes values, empty list items and then returns the dictionary Args: s: str Returns: items in item_list """ # split between key/value pairs # remove empty list items and split key, value pairs item_list: list = [x.split() for x in s.split(";") if len(x) > 0] # remove quotes for values and return dictionary return {item[0]: item[1].strip('"') for item in item_list} def dict_to_str(d: dict) -> str: """Joins a key/value pair with a space in a list Takes key/value pairs from a dictionary and joins them with a space on a list Joins items from said list that are marked with ; and end with ; Checks if the value is Not a Number (nan) Args: "key", "value" (str) Returns: s: str """ # join key, value pairs from dictionary with a space in a list, # then join items in list by ; # end on ; # check if value is nan s: str = ( "; ".join([f'{key} "{value}"' for key, value in d.items() if value == value]) + ";" ) return s def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame: """Creates columns that are well defnined by .gtf file standards The first 8 defined columns are constant as defined by gtf file standards Further columns are assumed to be free text columns and superflous Args: df_all: DataFrame Returns: DataFrame with 8 columns as defined by gtf file standards """ # the first 8 columns should be constant according to gtf file standard # we assume that further columns are parsed free text columns df_free_text = df_all.iloc[:, 8:] df = df_all.iloc[:, :8] df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(dict_to_str) return df def write_gtf(df: pd.DataFrame, filename: str) -> None: """Checks all data types in the pd.DataFrame Goes through the updated pd.DataFrame after formatting to gtf file standards and checks if the data types have been formatted correctly. Args: Types ("filename", "sep", "header", "index", "quoting", "quotechar", "mode") Filename: str Returns: DataFrame defined correctly via gtf.dtypes """ # Make sure the data types are correct. df = df.astype(Gtf.dtypes) df.to_csv( filename, sep="\t", header=False, index=False, quoting=None, quotechar="'", mode="a", ) def write_header(annotations_file: str) -> None: """Opens up an annotation file with the datatypes defined as correct Args: Returns: """ with open(annotations_file, "w") as fh: fh.write("\t".join(Gtf.dtypes.keys()) + "\n") class Gtf: """Class to read transcripts annotations file and parse it into a pandas Dataframe. Args: annotations_file: Path to gtf file. Attributes: annotations_file: File with transcript annotation of the genome """ dtypes = { "seqname": object, "source": object, "feature": object, "start": int, "end": int, "score": object, "strand": object, "frame": object, "free_text": object, } def __init__(self): self.parsed = False self.original_columns = list(self.dtypes.keys()) self.free_text_columns = [] def read_file(self, annotations_file: str) -> None: """Defines a limit for larger input Files. Iterates lines and Filters on bool. If the chuncksize of the inputed annotation file is larger than 100000 it will iterate over the lines and filters before saving. Args: Path to annotations_file: str Returns: If the file chunk is over a certain size it will reiterate the lines and files. Raises: ValueError: The file type is required to be .gtf """ # for large annotation files, iterate over lines and filter before saving to dataframe if not annotations_file.endswith("gtf"): raise ValueError("File type needs to be gtf") reader = pd.read_table( annotations_file, sep="\t", comment="#", names=self.dtypes.keys(), dtype=self.dtypes, chunksize=100000, iterator=True, ) self.df = pd.concat([filter_df(chunk) for chunk in reader]) def from_dataframe(self, df: pd.DataFrame) -> None: """ Initializes Gtf object from pandas Dataframe. Part of initialization is: Set dataframe attribute Check which columns belong to the free-text part of the GTF-file. Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE. Args: df: DataFrame Returns: None """ self.free_text_columns = [ col for col in df.columns if col not in self.original_columns ] self.df = df if not "free_text" in df.columns: self.parsed = True def parse_free_text(self): """Creates a self DataFrame with columns for parsed free text Creates a dataframe with columns for values in the free text column and then joins the free_text column to orginal dataframe. Drops the free_text column itself. Args: Returns: Parsed DataFrame with free_text column joined with orginal and dropped. """ assert self.parsed == False # create dataframe with columns for values in free_text column df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series) # remember which columns come from free_text self.free_text_columns = df_free_text.columns # join free_text columns to original dataframe and drop the "free_text" column itself self.df = self.df.drop("free_text", axis=1) self.original_columns = self.df.columns self.df = self.df.join(df_free_text, how="inner") # remember that current dataframe is parsed, i.e. can't be written in gtf format self.parsed = True def reverse_parse_free_text(self): """Creates a reversed self DataFrame with columns for non parsed free text Creates a data frame with only free_text columns and then filters current dataframe down to only orginal_columns leaving the free_text column untouched. The parsing is undone and the results saved in the free_text column and defined as non parsed. Args: Returns: """ assert self.parsed == True # create dataframe with only free_text columns df_free_text = self.df[self.free_text_columns] # filter current dataframe to only original columns, except "free_text" column self.df = self.df[self.original_columns] # undo parsing and save result in "free_text" column self.df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply( dict_to_str ) # remember that current dataframe is not parsed self.parsed = False def pick_transcript(self, transcript_id: str) -> pd.DataFrame: return self.df.query(f"transcript_id == '{transcript_id}'") class TranscriptGenerator: def __init__( self, transcript_id: str, transcript_count: int, transcript_df: pd.DataFrame, prob_inclusion: float, ): assert len(transcript_df) > 0 assert transcript_count > 0 assert (prob_inclusion >= 0) and (prob_inclusion <= 1) self.id = transcript_id self.count = transcript_count self.df = transcript_df self.no_exons = len(transcript_df) self.strand = self.df["strand"].unique().item() self.prob_inclusion = prob_inclusion def _get_inclusions(self) -> np.array: """Generate inclusions array where each column corresponds to one sample and the number of columns corresponds to the number of samples. Returns: np.array: inclusions, where True means intron inclusion """ inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion if self.strand == "+": inclusion_arr[-1, :] = False elif self.strand == "-": inclusion_arr[-1, :] = False return inclusion_arr def _get_unique_inclusions(self) -> tuple[list, np.array, np.array]: """Inclusion of unique intron inclusion via arrays and counts and name generation of each unique count. Args: Returns: Bool: If true include unique intron array and count and create unique name and count. """ inclusion_arr = self._get_inclusions() # Unique intron inclusion arrays and counts inclusion_arr_unique, counts = np.unique( inclusion_arr, axis=1, return_counts=True ) # Name for each generated transcript names = [] for i in range(inclusion_arr_unique.shape[1]): if np.all(inclusion_arr_unique[:, i] == False, axis=0): names.append(self.id) else: names.append(f"{self.id}_{i}") return names, inclusion_arr_unique, counts def _get_df(self, inclusions: np.array, transcript_id: str) -> pd.DataFrame: """Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions. Args: inclusions (np.array): boolean vector denoting intron inclusion transcript_id (str): transcript id Returns: pd.DataFrame: Derived dataframe """ df_generated = self.df.copy() if self.strand == "+": origninal_end = df_generated["end"] df_generated["end"] = np.where( inclusions, df_generated["start"].shift(periods=-1, fill_value=-1) - 1, origninal_end, ) if self.strand == "-": origninal_start = df_generated["start"] df_generated["start"] = np.where( inclusions, df_generated["end"].shift(periods=-1, fill_value=-1) + 1, origninal_start, ) original_id = df_generated["exon_id"] df_generated["exon_id"] = np.where( inclusions, df_generated["exon_id"] + "_" + np.arange(len(df_generated)).astype(str), original_id, ) df_generated["transcript_id"] = transcript_id return df_generated def generate_transcripts(self, filename: str) -> None: """Write transcripts to file. Args: filename (str): Output csv filename """ ids, inclusions, counts = self._get_unique_inclusions() with open(filename, "a") as fh: for transcript_id, transcript_count in zip(ids, counts): fh.write(f"{transcript_id},{self.id},{transcript_count}\n") def generate_annotations(self, filename: str) -> None: """Generates a gtf file including IDs, inclusion, and counts from reverse parse free text Args: Filename: str Returns: Gtf file with filename Raises: ValueError: If self.ID could not be sampled (No ID generated for the inclusion transcript) """ ids, inclusions, counts = self._get_unique_inclusions() n_unique = len(ids) try: df = pd.concat( [self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)] ) df = reverse_parse_free_text(df) write_gtf(df, filename) LOG.debug(f"Transcript {self.id} sampled") except ValueError: LOG.error(f"Transcript {self.id} could not be sampled.") def sample_transcripts( input_transcripts_file: str, input_annotations_file: str, prob_inclusion: float, output_transcripts_file: str, output_annotations_file: str, ): transcripts = read_abundances(input_transcripts_file) LOG.info("Parsing annotations...") annotations = Gtf() annotations.read_file(input_annotations_file) annotations.parse_free_text() LOG.info("Done parsing...") LOG.info("Start sampling transcripts...") # Set up output file, write header once and append data in loop write_header(output_annotations_file) for _, row in tqdm(transcripts.iterrows()): transcript_id = row["id"] transcript_count = row["count"] transcript_df = annotations.pick_transcript(transcript_id) transcripts = TranscriptGenerator( transcript_id, transcript_count, transcript_df, prob_inclusion=prob_inclusion, ) transcripts.generate_annotations(output_annotations_file) transcripts.generate_transcripts(output_transcripts_file) LOG.info("Done.")