From f0286852187e256f9e5a88f6377a9c3308a3c44c Mon Sep 17 00:00:00 2001 From: Mate Balajti <mate.balajti@unibas.ch> Date: Thu, 10 Aug 2023 07:47:56 +0000 Subject: [PATCH] refactor: update main and tests for CI workflow --- .flake8 | 3 - .pylintrc | 5 - LICENSE.md | 21 ---- requirements_dev.txt | 2 + tests/test_main.py | 49 ++++---- tsg/cli.py | 18 +-- tsg/main.py | 260 +++++++++++++++++++++++++------------------ 7 files changed, 194 insertions(+), 164 deletions(-) delete mode 100644 .flake8 delete mode 100644 .pylintrc delete mode 100644 LICENSE.md diff --git a/.flake8 b/.flake8 deleted file mode 100644 index 1d48b94..0000000 --- a/.flake8 +++ /dev/null @@ -1,3 +0,0 @@ -[flake8] -max-line-length = 120 -docstring-convention = google \ No newline at end of file diff --git a/.pylintrc b/.pylintrc deleted file mode 100644 index f6b4eac..0000000 --- a/.pylintrc +++ /dev/null @@ -1,5 +0,0 @@ -[FORMAT] -max-line-length=120 - -[BASIC] -good-names=df, i, fh, id, s, d \ No newline at end of file diff --git a/LICENSE.md b/LICENSE.md deleted file mode 100644 index 2313fb3..0000000 --- a/LICENSE.md +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2021 Zavolan Lab, Biozentrum, University of Basel - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt index a68b479..af0c609 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,7 +1,9 @@ matplotlib pandas +pandas-stubs pip tqdm +types-tqdm flake8-docstrings mypy flake8 diff --git a/tests/test_main.py b/tests/test_main.py index 49aaeea..a2f96d6 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,8 +1,6 @@ -"""Tests for main module""" +"""Tests for main module.""" -import numpy as np import pandas as pd -import pytest from tsg.main import Gtf, TranscriptGenerator, dict_to_str, str_to_dict @@ -10,8 +8,10 @@ class TestFreeTextParsing: """Test if free text dictionary is correctly parsed.""" def test_str2dict(self): + """Test for str2dict function.""" res = str_to_dict( - 'gene_id "GENE2"; transcript_id "TRANSCRIPT2"; exon_number "1"; exon_id "EXON1";' + 'gene_id "GENE2"; transcript_id "TRANSCRIPT2"; \ + exon_number "1"; exon_id "EXON1";' ) assert res == { @@ -22,6 +22,7 @@ class TestFreeTextParsing: } def test_dict2str(self): + """Test for dict2str function.""" res = dict_to_str( { "gene_id": "GENE2", @@ -31,14 +32,17 @@ class TestFreeTextParsing: } ) print(res) - assert ( - res - == 'gene_id "GENE2"; transcript_id "TRANSCRIPT2"; exon_number "1"; exon_id "EXON1";' + assert res == ( + 'gene_id "GENE2"; ' + 'transcript_id "TRANSCRIPT2"; ' + 'exon_number "1"; ' + 'exon_id "EXON1";' ) class TestGtf: - "Test if Gtf class works correctly." + """Test if Gtf class works correctly.""" + cols = [ "seqname", "source", @@ -52,19 +56,21 @@ class TestGtf: ] def test_init(self): + """Test for init function.""" annotations = Gtf() annotations.read_file("tests/resources/Annotation1.gtf") - assert annotations.parsed == False + assert annotations.parsed is False assert annotations.original_columns == self.cols assert annotations.free_text_columns == [] def test_parsed(self): + """Test for parsed function.""" annotations = Gtf() annotations.read_file("tests/resources/Annotation1.gtf") annotations.parse_key_value() - assert annotations.parsed == True + assert annotations.parsed is True assert set(annotations.free_text_columns) == set( [ "gene_id", @@ -75,11 +81,14 @@ class TestGtf: ] ) assert set(annotations.original_columns) == set( - ["seqname", "source", "feature", "start", "end", "score", "strand", "frame"] + ["seqname", "source", "feature", "start", + "end", "score", "strand", "frame"] ) class TestTranscriptGenerator: + """Test for TranscriptGenerator class.""" + cols = [ "start", "end", @@ -98,35 +107,31 @@ class TestTranscriptGenerator: df2 = pd.DataFrame(columns=["start", "end", "strand"]) def test_init(self): + """Test for init.""" transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.05) assert transcripts.strand == "+" - def test_init_2(self): - with pytest.raises(AssertionError): - transcripts = TranscriptGenerator("TRANSCRIPT2", 3, self.df2, 0.05) - - def test_init_3(self): - with pytest.raises(AssertionError): - transcripts = TranscriptGenerator("TRANSCRIPT1", 0, self.df1, 0.05) - def test_inclusions(self): + """Test for inclusions.""" transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.5) - res = transcripts._get_inclusions() + res = transcripts.get_inclusions() assert res.shape == (3, 3) def test_unique_inclusions(self): + """Test for unique inclusions.""" transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.5) - res1, res2, res3 = transcripts._get_unique_inclusions() + transcripts.get_unique_inclusions() def test_get_df(self): + """Test for get_df function.""" inclusions = [False, True, False] expected_end = pd.Series([20, 79, 100], name="end") transcript_id = "TRANSCRIPT1_1" transcripts = TranscriptGenerator("TRANSCRIPT1", 3, self.df1, 0.5) - res = transcripts._get_df(inclusions, transcript_id) + res = transcripts.get_df(inclusions, transcript_id) assert res["transcript_id"].unique().item() == "TRANSCRIPT1_1" assert res["strand"].unique().item() == "+" diff --git a/tsg/cli.py b/tsg/cli.py index 63996d9..fcc0e71 100644 --- a/tsg/cli.py +++ b/tsg/cli.py @@ -6,8 +6,9 @@ from pathlib import Path from tsg.main import sample_transcripts -def setup_logging(loglevel: str = None) -> None: - """Set up logging. Loglevel can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]. +def setup_logging(loglevel: str) -> None: + """Set up logging. Loglevel can be one of \ + ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]. Args: loglevel: Level of log output. @@ -29,13 +30,14 @@ def setup_logging(loglevel: str = None) -> None: raise logging.basicConfig( - format='[%(asctime)s: %(levelname)s] %(message)s (module "%(module)s")', + format='[%(asctime)s: %(levelname)s] \ + %(message)s (module "%(module)s")', level=numeric_level, ) -def build_arg_parser() -> argparse.ArgumentParser: - """Builds the argument parser. +def build_arg_parser() -> argparse.Namespace: + """Build the argument parser. Args: 1) path to the csv-file with the number of transcripts @@ -71,7 +73,8 @@ def build_arg_parser() -> argparse.ArgumentParser: "--log", type=str, default="INFO", - help='Level of logging. Can be one of ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]', + help='Level of logging. Can be one of \ + ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]', ) args = parser.parse_args() @@ -110,7 +113,8 @@ def output_filename(filename: str) -> str: def app(): - """Gets the args, sets up the logging and starts the programm with the provided parameters. + """Get the args, sets up the logging \ + and starts the programm with the provided parameters. Args: 1) path to the csv-file with the number of transcripts diff --git a/tsg/main.py b/tsg/main.py index 4ad774f..f85a8c6 100644 --- a/tsg/main.py +++ b/tsg/main.py @@ -30,67 +30,76 @@ def read_abundances(transcripts_file: str) -> pd.DataFrame: raise ValueError("File type needs to be either csv or tsv") -def filter_df(df: pd.DataFrame, transcripts: list = None) -> pd.DataFrame: - """Filter annotations to include only exons with the highest transcript support level, i.e. TSL1. +def filter_df(gtf_df: pd.DataFrame, transcripts: list) -> pd.DataFrame: + """Filter annotations to include only exons \ + with the highest transcript support level, i.e. TSL1. `feature` column is filtered on value "exon" and - `free_text` column is filtered to include the string denoting the highest transcript support level + `free_text` column is filtered to include the string \ + denoting the highest transcript support level ('transcript_support_level "1"'). - If a list of transcript IDs is given, `free_text` column is filtered to include one of the IDs. + If a list of transcript IDs is given, `free_text` column \ + is filtered to include one of the IDs. Args: df: A pd.DataFrame containing an unparsed gtf-file transcript: list of transcript IDs Returns: - A pd.DataFrame containing only rows with exon annotations of highest transcript support level and, + A pd.DataFrame containing only rows with exon annotations \ + of highest transcript support level and, if provided, belonging to one of the given transcripts """ if transcripts is None: transcripts = [] - df_filter = df[ - (df["feature"] == "exon") - & (df["free_text"].str.contains('transcript_support_level "1"')) + df_filter = gtf_df[ + (gtf_df["feature"] == "exon") + & (gtf_df["free_text"].str.contains('transcript_support_level "1"')) ] if len(transcripts) > 0: - df_filter = df_filter["free_text"].str.contains( + df_filter = df_filter[df_filter["free_text"].str.contains( "|".join(transcripts), regex=True - ) + )] return df_filter -def str_to_dict(s: str) -> dict: +def str_to_dict(gene_string: str) -> dict: """Split between key/value pairs. - Split string based on delimiter ';' into items, remove empty items and split items on delimiter ' ' into + Split string based on delimiter ';' into items, remove empty items and \ + split items on delimiter ' ' into key/value pairs. Remove quotes from value strings and create a dictionary. Args: s: A string of the form 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";' Returns: - A dictionary containing e.g. {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} + A dictionary containing e.g. \ + {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} """ # split into items # remove empty items # split items into key/value pairs - item_list: list = [x.split() for x in s.split(";") if len(x) > 0] + item_list: list = [x.split() for x in gene_string.split(";") if len(x) > 0] # remove quotes for values and return dictionary return {item[0]: item[1].strip('"') for item in item_list} -def dict_to_str(d: dict) -> str: +def dict_to_str(gene_dict: dict) -> str: """Parse dictionary in gtf free_text column format. - Takes e.g. dictionary {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} and returns - string 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'. - Key/value pairs are joined by space to form an item and items are joinded by ';' to form a string. - If a value is Not a Number (nan), the key/value pair is omitted from the string. + Takes e.g. dictionary {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} + and returns string 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'. + Key/value pairs are joined by space to form an item and items are \ + joinded by ';' to form a string. + If a value is Not a Number (nan), the key/value pair is omitted \ + from the string. Args: - d: A dictionary of the form {'gene_id': 'GENE1', 'transcript_id': 'TRANSCRIPT1'} + d: A dictionary of the form {'gene_id': 'GENE1', \ + 'transcript_id': 'TRANSCRIPT1'} Returns: A string, e.g. 'gene_id "GENE1"; transcript_id "TRANSCRIPT1";'. @@ -99,19 +108,21 @@ def dict_to_str(d: dict) -> str: # then join items in list by ; # end on ; # value == value checks that value is not nan - s: str = ( - "; ".join([f'{key} "{value}"' for key, value in d.items() if value == value]) - + ";" - ) - return s + gene_string: str = "; ".join( + [f'{key} "{value}"' for key, value in gene_dict.items()] + ) + ";" + return gene_string def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame: - """Reverse parsing of gtf based pd.DataFrame to include only columns that are well defnined by gtf-file standards. + """Reverse parsing of gtf based pd.DataFrame to include only columns that \ + are well defnined by gtf-file standards. The first 8 defined columns are constant as defined by gtf-file standards. - Further columns are assumed to be parsed free-text columns (see Gtf.parse_free_text()). - The parsed free-text columns are aggregated as a dictionary and the dictionry is parsed as a string in gtf format. + Further columns are assumed to be parsed free-text columns \ + (see Gtf.parse_free_text()). + The parsed free-text columns are aggregated as a dictionary and \ + the dictionry is parsed as a string in gtf format. Args: df_all: A pd.DataFrame containing a parsed gtf-file. @@ -122,43 +133,46 @@ def reverse_parse_free_text(df_all: pd.DataFrame) -> pd.DataFrame: # Define pd.DataFrame containing only parsed free-text columns df_free_text = df_all.iloc[:, 8:] # Define pd.DataFrame containing only non-parsed columns - df = df_all.iloc[:, :8] - # Reverse parsing of free-text columns and add the result as column `free_text` to output pd.DataFrame - df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply(dict_to_str) - return df + df_non_parsed = df_all.iloc[:, :8] + # Reverse parsing of free-text columns and add the result as column \ + # `free_text` to output pd.DataFrame + df_non_parsed["free_text"] = df_free_text.agg( + pd.Series.to_dict, axis=1 + ).apply(dict_to_str) + return df_non_parsed -def write_gtf(df: pd.DataFrame, filename: str) -> None: +def write_gtf(gtf_df: pd.DataFrame, filename: str) -> None: """Save a Gtf object to file in gtf format. Makes sure data types are correct and saves object in gtf format. Args: - df: A pd.DataFrame containing a gtf-file. + gtf_df: A pd.DataFrame containing a gtf-file. filename: File to save to. """ # Make sure the data types are correct. - df = df.astype(Gtf.dtypes) + gtf_df = gtf_df.astype(Gtf.dtypes) - df.to_csv( + gtf_df.to_csv( filename, sep="\t", header=False, index=False, - quoting=None, quotechar="'", mode="a", ) def write_header(annotations_file: str) -> None: - """Write the header of an annotations file, consisting of the tab delimited column names. + """Write the header of an annotations file, consisting of the \ + tab delimited column names. Args: annotations_file: Filename to write header to. """ - with open(annotations_file, "w", encoding="utf_8") as fh: - fh.write("\t".join(Gtf.dtypes.keys()) + "\n") + with open(annotations_file, "w", encoding="utf_8") as file_header: + file_header.write("\t".join(Gtf.dtypes.keys()) + "\n") class Gtf: @@ -168,7 +182,8 @@ class Gtf: dtypes: A dictionary containing column names and respective data types. parsed: A boolean indicating if the pd.DataFrame is parsed. original_columns: A list of columns not touched by parsing. - free_text_columns: A list of columns created during parsing of column `free_text`. + free_text_columns: A list of columns created during parsing \ + of column `free_text`. """ dtypes = { @@ -185,7 +200,7 @@ class Gtf: def __init__(self): """Initialize Gtf object.""" - self.df = None + self.data_frame = None self.parsed = False self.original_columns = list(self.dtypes.keys()) self.free_text_columns = [] @@ -193,8 +208,9 @@ class Gtf: def read_file(self, annotations_file: str) -> None: """Read gtf-file. - Iterate over chunks of the gtf-file reading 100000 rows at a time. Filter chunks for exon annotations of - the highest transcript support level. Concatenate chunks to get resulting pd.DataFrame. + Iterate over chunks of the gtf-file reading 100000 rows at a time. + Filter chunks for exon annotations of the highest transcript support + level. Concatenate chunks to get resulting pd.DataFrame. Args: annotations_file: Filename of annotations. @@ -209,70 +225,81 @@ class Gtf: annotations_file, sep="\t", comment="#", - names=self.dtypes.keys(), + names=list(self.dtypes.keys()), dtype=self.dtypes, chunksize=100000, iterator=True, ) - self.df = pd.concat([filter_df(chunk) for chunk in reader]) + self.data_frame = pd.concat( + [filter_df(chunk, transcripts=[]) for chunk in reader] + ) - def from_dataframe(self, df: pd.DataFrame) -> None: + def from_dataframe(self, gtf_df: pd.DataFrame) -> None: """Initialize Gtf object from pandas Dataframe. Part of initialization is: Set dataframe attribute Check which columns belong to the free-text part of the gtf-file. - Check if there are no columns called free-text and if so, sets the value of parsed attribute to TRUE. + Check if there are no columns called free-text and if so, sets \ + the value of parsed attribute to TRUE. Args: - df: A pd.DataFrame containing a gtf-file. + gtf_df: A pd.DataFrame containing a gtf-file. """ self.free_text_columns = [ - col for col in df.columns if col not in self.original_columns + col for col in gtf_df.columns if col not in self.original_columns ] - self.df = df - if "free_text" not in df.columns: + self.data_frame = gtf_df + if "free_text" not in gtf_df.columns: self.parsed = True def parse_key_value(self): - """Parse key/value pairs from `free_text` column into column `key` with row entry `value`. + """Parse key/value pairs from `free_text` column into column `key` \ + with row entry `value`. - Creates a dataframe with columns for keys in the free-text column instead of `free_text` column. + Creates a dataframe with columns for keys in the free-text column \ + instead of `free_text` column. Saves it to Gtf.df attribute. """ assert self.parsed is False # create dataframe with columns for values in free_text column - df_free_text = self.df["free_text"].map(str_to_dict).apply(pd.Series) + df_free_text = self.data_frame["free_text"].map( + str_to_dict + ).apply(pd.Series) # remember which columns come from free_text self.free_text_columns = df_free_text.columns - # join free_text columns to original dataframe and drop the "free_text" column itself - self.df = self.df.drop("free_text", axis=1) - self.original_columns = self.df.columns - self.df = self.df.join(df_free_text, how="inner") - # remember that current dataframe is parsed, i.e. can't be written in gtf format + # join free_text columns to original dataframe and \ + # drop the "free_text" column itself + self.data_frame = self.data_frame.drop("free_text", axis=1) + self.original_columns = self.data_frame.columns + self.data_frame = self.data_frame.join(df_free_text, how="inner") + # remember that current dataframe is parsed, \ + # i.e. can't be written in gtf format self.parsed = True def reverse_parse_free_text(self): """Reverses parsing of `free_text` column. - Creates a data frame that can be written in gtf format to file. Parsed free-text columns are aggregated + Creates a data frame that can be written in gtf format to file. + Parsed free-text columns are aggregated into `free_text` column according to gtf format specification. """ assert self.parsed is True # create dataframe with only free_text columns - df_free_text = self.df[self.free_text_columns] - # filter current dataframe to only original columns, except "free_text" column - self.df = self.df[self.original_columns] + df_free_text = self.data_frame[self.free_text_columns] + # filter current dataframe to only original columns, \ + # except "free_text" column + self.data_frame = self.data_frame[self.original_columns] # undo parsing and save result in "free_text" column - self.df["free_text"] = df_free_text.agg(pd.Series.to_dict, axis=1).apply( - dict_to_str - ) + self.data_frame["free_text"] = df_free_text.agg( + pd.Series.to_dict, axis=1 + ).apply(dict_to_str) # remember that current dataframe is not parsed self.parsed = False def pick_transcript(self, transcript_id: str) -> pd.DataFrame: """Filter annotations to a given transcript ID.""" - return self.df.query(f"transcript_id == '{transcript_id}'") + return self.data_frame.query(f"transcript_id == '{transcript_id}'") class TranscriptGenerator: @@ -289,13 +316,15 @@ class TranscriptGenerator: strands = transcript_df["strand"].unique() if len(transcript_df) == 0: LOG.warning( - "Transcript %s can't be sampled. Annotation is missing", transcript_id + "Transcript %s can't be sampled. \ + Annotation is missing", transcript_id ) instance = None elif len(strands) > 1: LOG.warning( - "Transcript %s can't be sampled. Transcript generator is not implemented for transcripts with" - " exons annotated on different strands", + "Transcript %s can't be sampled. Transcript generator \ + is not implemented for transcripts with \ + exons annotated on different strands", transcript_id, ) instance = None @@ -312,22 +341,25 @@ class TranscriptGenerator: prob_inclusion: float, ): """Initialize TranscriptGenerator object.""" - self.id = transcript_id + self.ts_id = transcript_id self.count = transcript_count - self.df = transcript_df + self.data_frame = transcript_df self.no_exons = len(transcript_df) - self.strand = self.df["strand"].unique().item() + self.strand = self.data_frame["strand"].unique().item() self.prob_inclusion = prob_inclusion - def _get_inclusions(self) -> np.array: + def get_inclusions(self) -> np.ndarray: """Generate inclusions array. - Each column corresponds to one sample and the number of columns corresponds to the number of samples. + Each column corresponds to one sample and the number of columns \ + corresponds to the number of samples. Returns: A boolean np.array, where True means intron inclusion. """ - inclusion_arr = np.random.rand(self.no_exons, self.count) < self.prob_inclusion + inclusion_arr = np.random.rand( + self.no_exons, self.count + ) < self.prob_inclusion if self.strand == "+": inclusion_arr[-1, :] = False elif self.strand == "-": @@ -335,17 +367,20 @@ class TranscriptGenerator: return inclusion_arr - def _get_unique_inclusions(self) -> tuple[list, np.array, np.array]: - """Inclusion of unique intron inclusion via arrays and counts and name generation of each unique count. + def get_unique_inclusions(self) -> tuple[list, np.ndarray, np.ndarray]: + """Inclusion of unique intron inclusion via arrays and counts and \ + name generation of each unique count. Args: Returns: - List of names for generated exons. - - A boolean np.array where columns correspond to generated transcripts and rows to intron inclusion. - - A np.array containing sample number per generated inclusions, i.e. transcript. + - A boolean np.array where columns correspond to generated \ + transcripts and rows to intron inclusion. + - A np.array containing sample number per generated inclusions, \ + i.e. transcript. """ - inclusion_arr = self._get_inclusions() + inclusion_arr = self.get_inclusions() # Unique intron inclusion arrays and counts inclusion_arr_unique, counts = np.unique( inclusion_arr, axis=1, return_counts=True @@ -354,14 +389,17 @@ class TranscriptGenerator: names = [] for i in range(inclusion_arr_unique.shape[1]): if np.all(inclusion_arr_unique[:, i] is False, axis=0): - names.append(self.id) + names.append(self.ts_id) else: - names.append(f"{self.id}_{i}") + names.append(f"{self.ts_id}_{i}") return names, inclusion_arr_unique, counts - def _get_df(self, inclusions: np.array, transcript_id: str) -> pd.DataFrame: - """Take as input a dataframe filtered to one transcript and a boolean vector denoting intron inclusions. + def get_df( + self, inclusions: np.ndarray, transcript_id: str + ) -> pd.DataFrame: + """Take as input a dataframe filtered to one transcript and \ + a boolean vector denoting intron inclusions. Args: inclusions: A boolean vector denoting intron inclusion. @@ -370,7 +408,7 @@ class TranscriptGenerator: Returns: The generated transcript as a pd.DataFrame. """ - df_generated = self.df.copy() + df_generated = self.data_frame.copy() if self.strand == "+": original_end = df_generated["end"] df_generated["end"] = np.where( @@ -389,7 +427,9 @@ class TranscriptGenerator: original_id = df_generated["exon_id"] df_generated["exon_id"] = np.where( inclusions, - df_generated["exon_id"] + "_" + np.arange(len(df_generated)).astype(str), + df_generated["exon_id"] + "_" + np.arange( + len(df_generated) + ).astype(str), original_id, ) @@ -402,10 +442,12 @@ class TranscriptGenerator: Args: filename: Output csv filename. """ - ids, _, counts = self._get_unique_inclusions() - with open(filename, "a", encoding="utf_8") as fh: + ids, _, counts = self.get_unique_inclusions() + with open(filename, "a", encoding="utf_8") as file_handle: for transcript_id, transcript_count in zip(ids, counts): - fh.write(f"{transcript_id},{self.id},{transcript_count}\n") + file_handle.write( + f"{transcript_id},{self.ts_id},{transcript_count}\n" + ) def write_annotations(self, filename: str) -> None: """Generate a annotations in gtf format for sampled transcript. @@ -416,16 +458,16 @@ class TranscriptGenerator: Raises: ValueError: If given transcript ID could not be sampled. """ - ids, inclusions, _ = self._get_unique_inclusions() + ids, inclusions, _ = self.get_unique_inclusions() n_unique = len(ids) - df = pd.concat( - [self._get_df(inclusions[:, i], ids[i]) for i in range(n_unique)] + data_frame = pd.concat( + [self.get_df(inclusions[:, i], ids[i]) for i in range(n_unique)] ) - df = reverse_parse_free_text(df) + data_frame = reverse_parse_free_text(data_frame) - write_gtf(df, filename) - LOG.debug("Transcript %s sampled", self.id) + write_gtf(data_frame, filename) + LOG.debug("Transcript %s sampled", self.ts_id) def sample_transcripts( @@ -435,14 +477,20 @@ def sample_transcripts( output_transcripts_file: str, output_annotations_file: str, ): - """Read input files, iterate over transcript IDs, sample each transcript and save results. + """Read input files, iterate over transcript IDs, \ + sample each transcript and save results. Args: - input_transcripts_file: Filename of transcript abundances, needs to be csv or tsv. - input_annotations_file: Filename of annotations, needs to be gtf. - prob_inclusion: Probability of intron inclusion, needs to be float in range [0,1]. - output_transcripts_file: Filename of file to write sampled transcripts to. - output_annotations_file: Filename of file to write generated annotations to. + input_transcripts_file: Filename of transcript abundances, \ + needs to be csv or tsv. + input_annotations_file: Filename of annotations, \ + needs to be gtf. + prob_inclusion: Probability of intron inclusion, \ + needs to be float in range [0,1]. + output_transcripts_file: Filename of file to write \ + sampled transcripts to. + output_annotations_file: Filename of file to write \ + generated annotations to. """ LOG.info("Probability of intron inclusion: %s", str(prob_inclusion)) LOG.info("Parsing transcript abundances...") @@ -466,15 +514,15 @@ def sample_transcripts( transcript_count = row["count"] transcript_df = annotations.pick_transcript(transcript_id) - transcripts = TranscriptGenerator( + transcript_generator = TranscriptGenerator( transcript_id, transcript_count, transcript_df, prob_inclusion=prob_inclusion, ) try: - transcripts.write_annotations(output_annotations_file) - transcripts.write_sequences(output_transcripts_file) + transcript_generator.write_annotations(output_annotations_file) + transcript_generator.write_sequences(output_transcripts_file) except AttributeError: pass LOG.info("Done.") -- GitLab