diff --git a/tsg/cli.py b/tsg/cli.py index 1a667d8b452041a112182529c6eeb071eb2ab40c..b67e3521d5d75044bbad196cccff43b611ab4dba 100644 --- a/tsg/cli.py +++ b/tsg/cli.py @@ -6,7 +6,14 @@ from tsg.main import sample_transcripts def setup_logging(loglevel: str = None) -> None: - # Set up logging + """Set up logging. Loglevel can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]. + + Args: + loglevel (str, optional): Level of log output. Defaults to None. + + Raises: + ValueError: If string that is not a log level is passed, raise error. + """ if loglevel: numeric_level = getattr(logging, loglevel.upper()) if not isinstance(numeric_level, int): @@ -20,7 +27,7 @@ def setup_logging(loglevel: str = None) -> None: ) -def build_arg_parser(): +def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() parser.add_argument("--transcripts", type=str) parser.add_argument("--annotation", type=str) @@ -30,7 +37,7 @@ def build_arg_parser(): return parser -def get_args(): +def get_args() -> argparse.Namespace: parser = build_arg_parser() args = parser.parse_args() @@ -39,6 +46,18 @@ def get_args(): def output_filename(filename: str) -> str: + """Generate output filename for given input filename. + + Args: + filename (str): Input filename + + Raises: + NotImplementedError: Only accept filetypes .csv, .tsv and .gtf. + FileExistsError: If the output file exists, raise error. + + Returns: + str: Output filename + """ filepath = Path(filename) if filepath.suffix == ".csv" or filepath.suffix == ".tsv": outfile = "generated_" + filepath.stem + ".csv" diff --git a/tsg/main.py b/tsg/main.py index 7cfa0d68552f1b24cb88a6381e6ad8b1cc0cdc99..ca96955926c50d60112616f94d9cfc024af8320f 100644 --- a/tsg/main.py +++ b/tsg/main.py @@ -1,3 +1,5 @@ +"""Sample transcripts.""" + import logging import numpy as np @@ -9,7 +11,7 @@ LOG = logging.getLogger(__name__) def read_abundances(transcripts_file: str) -> pd.DataFrame: - """Read abundance file into dataframe + """Read abundance file into dataframe. Args: transcripts_file (str): Input filename @@ -17,7 +19,7 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame: Returns: pd.DataFrame: Transcript abundances ("id", "count") - Raises: + Raises: ValueError: When the input file is neither csv or tsv """ cols: list = ["id", "count"] @@ -30,61 +32,68 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame: def filter_df(df: pd.DataFrame, transcripts: list = []) -> pd.DataFrame: - """Filters inclusions of exons and the highest transcript support level (TSL1) - - Data is filtered from the pd.DataFrame to include the string entery of "exons", based on the number of transcripts - it will choose the transcript with the highest transcript support level (TSL1-5). It will filter a list of transcript - IDs if that is given as an input. - - Args: - df: pd.DataFrame, transcript: list - - Returns: - df_filter: filter strings from pd.DataFrame ("exons", "transcript_support_level "1"") + """Filter annotations to include only exons with the highest transcript support level (TSL1). + + `feature` column is filtered on value "exon" and + `free_text` column is filtered to include the string denoting the highest transcript support level + ('transcript_support_level "1"'). + + If a list of transcript IDs is given, `free_text` column is filtered to include one of the IDs. + + Args: + df: A pd.DataFrame containing an unparsed gtf file + transcript: list of transcript IDs + + Returns: + A pd.DataFrame containing only rows with exon annotations of highest transcript support level and, + if provided, belonging to one of the given transcripts """ - # Filter annotations to exon and highest transcript support level. - # If list of transcript ids is given, filter for that as well. df_filter = df[ (df["feature"] == "exon") & (df["free_text"].str.contains('transcript_support_level "1"')) ] if len(transcripts) > 0: - df_filter = df_filter.str.contains("|".join(transcripts), regex=True) + df_filter = df_filter["free_text"].str.contains( + "|".join(transcripts), regex=True + ) return df_filter def str_to_dict(s: str) -> dict: - """Split between key/value pairs + """Split between key/value pairs. + + Split string based on delimiter ';' into items, remove empty items and split items on delimiter ' ' into key/value + pairs. + Remove quotes from value strings and create a dictionary. - Creates a dictionary based on the split between key and value pairs from the item_list - Also removes quotes values, empty list items and then returns the dictionary - - Args: - s: str + Args: + s: A string of the form 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";' Returns: - items in item_list + A dictionary containing e.g. {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} """ - # split between key/value pairs - # remove empty list items and split key, value pairs + # split into items + # remove empty items + # split items into key/value pairs item_list: list = [x.split() for x in s.split(";") if len(x) > 0] # remove quotes for values and return dictionary return {item[0]: item[1].strip('"') for item in item_list} def dict_to_str(d: dict) -> str: - """Joins a key/value pair with a space in a list + """Parse dictionary in gtf free_text column format. + + Takes e.g. dictionary {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} and returns + string 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'. + Key/value pairs are joined by space to form an item and items are joinded by ';' to form a string. + If a value is Not a Number (nan), the key/value pair is omitted from the string. - Takes key/value pairs from a dictionary and joins them with a space on a list - Joins items from said list that are marked with ; and end with ; - Checks if the value is Not a Number (nan) - - Args: - "key", "value" (str) + Args: + d: A dictionary of the form {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} - Returns: - s: str + Returns: + A string, e.g. 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'. """ # join key, value pairs from dictionary with a space in a list, # then join items in list by ; @@ -98,38 +107,35 @@ def dict_to_str(d: dict) -> str: def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame: - """Creates columns that are well defnined by .gtf file standards - + """Reverse parsing of gtf based pd.DataFrame to include only columns that are well defnined by gtf file standards. + The first 8 defined columns are constant as defined by gtf file standards - Further columns are assumed to be free text columns and superflous - + Further columns are assumed to be parsed free text columns (see Gtf.parse_free_text()). + The parsed free text columns are aggregated as a dictionary and the dictionry is parsed as a string in gtf format. + Args: - df_all: DataFrame + df_all: A pd.DataFrame containing a parsed gtf file. - Returns: - DataFrame with 8 columns as defined by gtf file standards + Returns: + A DataFrame with the columns as defined by gtf file standards. """ - # the first 8 columns should be constant according to gtf file standard - # we assume that further columns are parsed free text columns + # Define pd.DataFrame containing only parsed free text columns df_free_text = df_all.iloc[:, 8:] - + # Define pd.DataFrame containing only non-parsed columns df = df_all.iloc[:, :8] + # Reverse parsing of free text columns and add the result as column `free_text` to output pd.DataFrame df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(dict_to_str) return df def write_gtf(df: pd.DataFrame, filename: str) -> None: - """Checks all data types in the pd.DataFrame + """Save a Gtf object to file in gtf format. - Goes through the updated pd.DataFrame after formatting to gtf file standards - and checks if the data types have been formatted correctly. - - Args: - Types ("filename", "sep", "header", "index", "quoting", "quotechar", "mode") - Filename: str + Makes sure data types are correct and saves object in gtf format. - Returns: - DataFrame defined correctly via gtf.dtypes + Args: + df: A pd.DataFrame containing a gtf file. + Filename: File to save to. """ # Make sure the data types are correct. df = df.astype(Gtf.dtypes) @@ -146,26 +152,23 @@ def write_gtf(df: pd.DataFrame, filename: str) -> None: def write_header(annotations_file: str) -> None: - """Opens up an annotation file with the datatypes defined as correct + """Write the header of an annotations file, consisting of the tab delimited column names. Args: - - Returns: - + annotations_file: Filename to write header to. """ with open(annotations_file, "w") as fh: fh.write("\t".join(Gtf.dtypes.keys()) + "\n") class Gtf: - """Class to read transcripts annotations file and parse it into a pandas Dataframe. - - Args: - annotations_file: Path to gtf file. + """Class to read transcripts annotations file into a Gtf object. Attributes: - annotations_file: File with transcript annotation of the genome - + dtypes: A dictionary containing column names and respective data types. + parsed: A boolean indicating if the pd.DataFrame is parsed. + original_columns: A list of columns not touched by parsing. + free_text_columns: A list of columns created during parsing of column `free_text`. """ dtypes = { @@ -181,26 +184,23 @@ class Gtf: } def __init__(self): + """Initialize Gtf object.""" self.parsed = False self.original_columns = list(self.dtypes.keys()) self.free_text_columns = [] def read_file(self, annotations_file: str) -> None: - """Defines a limit for larger input Files. Iterates lines and Filters on bool. - - If the chuncksize of the inputed annotation file is larger than 100000 it will - iterate over the lines and filters before saving. + """Read gtf file. - Args: - Path to annotations_file: str + Iterate over chunks of the gtf file reading 100000 rows at a time. Filter chunks for exon annotations of + the highest transcript support level. Concatenate chunks to get resulting pd.DataFrame. - Returns: - If the file chunk is over a certain size it will reiterate the lines and files. + Args: + annotations_file: Filename of annotations. Raises: - ValueError: The file type is required to be .gtf + ValueError: The file type is required to be gtf. """ - # for large annotation files, iterate over lines and filter before saving to dataframe if not annotations_file.endswith("gtf"): raise ValueError("File type needs to be gtf") @@ -216,36 +216,29 @@ class Gtf: self.df = pd.concat([filter_df(chunk) for chunk in reader]) def from_dataframe(self, df: pd.DataFrame) -> None: - """ Initializes Gtf object from pandas Dataframe. - Part of initialization is: - Set dataframe attribute + """Initialize Gtf object from pandas Dataframe. + + Part of initialization is: + Set dataframe attribute Check which columns belong to the free-text part of the GTF-file. Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE. Args: - df: DataFrame - - Returns: - None + df: pd.DataFrame """ self.free_text_columns = [ col for col in df.columns if col not in self.original_columns ] self.df = df - if not "free_text" in df.columns: + if "free_text" not in df.columns: self.parsed = True def parse_free_text(self): - """Creates a self DataFrame with columns for parsed free text + """Parse key/value pairs from `free_text` column into column `key` with row entry `value`. - Creates a dataframe with columns for values in the free text column and then joins - the free_text column to orginal dataframe. Drops the free_text column itself. - - Args: - - Returns: - Parsed DataFrame with free_text column joined with orginal and dropped. - """ + Creates a dataframe with columns for keys in the free text column instead of `free_text` column. + Saves it to Gtf.df attribute. + """ assert self.parsed == False # create dataframe with columns for values in free_text column df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series) @@ -259,16 +252,11 @@ class Gtf: self.parsed = True def reverse_parse_free_text(self): - """Creates a reversed self DataFrame with columns for non parsed free text - - Creates a data frame with only free_text columns and then filters current dataframe down - to only orginal_columns leaving the free_text column untouched. The parsing is undone and the results - saved in the free_text column and defined as non parsed. - - Args: + """Reverses parsing of `free_text` column. - Returns: - """ + Creates a data frame that can be written in gtf format to file. Parsed free text columns are aggregated + into `free_text` column according to gtf format specification. + """ assert self.parsed == True # create dataframe with only free_text columns df_free_text = self.df[self.free_text_columns] @@ -282,10 +270,13 @@ class Gtf: self.parsed = False def pick_transcript(self, transcript_id: str) -> pd.DataFrame: + """Filter annotations to a given transcript ID.""" return self.df.query(f"transcript_id == '{transcript_id}'") class TranscriptGenerator: + """Class to sample a transcript.""" + def __init__( self, transcript_id: str, @@ -293,6 +284,7 @@ class TranscriptGenerator: transcript_df: pd.DataFrame, prob_inclusion: float, ): + """Initialize TranscriptGenerator object.""" assert len(transcript_df) > 0 assert transcript_count > 0 assert (prob_inclusion >= 0) and (prob_inclusion <= 1) @@ -305,10 +297,12 @@ class TranscriptGenerator: self.prob_inclusion = prob_inclusion def _get_inclusions(self) -> np.array: - """Generate inclusions array where each column corresponds to one sample and the number of columns corresponds to the number of samples. + """Generate inclusions array. + + Each column corresponds to one sample and the number of columns corresponds to the number of samples. Returns: - np.array: inclusions, where True means intron inclusion + inclusion_arr: A boolean np.array, where True means intron inclusion. """ inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion if self.strand == "+": @@ -320,11 +314,14 @@ class TranscriptGenerator: def _get_unique_inclusions(self) -> tuple[list, np.array, np.array]: """Inclusion of unique intron inclusion via arrays and counts and name generation of each unique count. - + Args: Returns: - Bool: If true include unique intron array and count and create unique name and count. + names: List of names for generated exons. + inclusion_arr_unique: A boolean np.array where columns correspond to generated transcripts and rows to + intron inclusion. + counts: A np.array containing sample number per generated inclusions, i.e. transcript. """ inclusion_arr = self._get_inclusions() # Unique intron inclusion arrays and counts @@ -345,11 +342,11 @@ class TranscriptGenerator: """Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions. Args: - inclusions (np.array): boolean vector denoting intron inclusion - transcript_id (str): transcript id + inclusions (np.array): A boolean vector denoting intron inclusion. + transcript_id (str): The transcript id. Returns: - pd.DataFrame: Derived dataframe + The generated transcript as a pd.DataFrame. """ df_generated = self.df.copy() if self.strand == "+": @@ -381,7 +378,7 @@ class TranscriptGenerator: """Write transcripts to file. Args: - filename (str): Output csv filename + filename (str): Output csv filename. """ ids, inclusions, counts = self._get_unique_inclusions() with open(filename, "a") as fh: @@ -389,16 +386,13 @@ class TranscriptGenerator: fh.write(f"{transcript_id},{self.id},{transcript_count}\n") def generate_annotations(self, filename: str) -> None: - """Generates a gtf file including IDs, inclusion, and counts from reverse parse free text - + """Generate a annotations in gtf format for sampled transcript. + Args: - Filename: str - - Returns: - Gtf file with filename - + Filename: Output gtf filename. + Raises: - ValueError: If self.ID could not be sampled (No ID generated for the inclusion transcript) + ValueError: If given transcript ID could not be sampled. """ ids, inclusions, counts = self._get_unique_inclusions() n_unique = len(ids) @@ -422,6 +416,15 @@ def sample_transcripts( output_transcripts_file: str, output_annotations_file: str, ): + """Read input files, iterate over transcript IDs, sample each transcript and save results. + + Args: + input_transcripts_file (str): Filename of transcript abundances, needs to be csv or tsv. + input_annotations_file (str): Filename of annotations, needs to be gtf. + prob_inclusion (float): Probability of intron inclusion, needs to be float in range [0,1]. + output_transcripts_file (str): Filename of file to write sampled transcripts to. + output_annotations_file (str): Filename of file to write generated annotations to. + """ transcripts = read_abundances(input_transcripts_file) LOG.info("Parsing annotations...")