Skip to content
Snippets Groups Projects
Commit ca55b93c authored by BIOPZ-Bak Maciej's avatar BIOPZ-Bak Maciej
Browse files

feat: add Snakemake profiles

parent b59e91e8
No related branches found
No related tags found
1 merge request!97feat: add Snakemake profiles
Showing
with 664 additions and 93 deletions
......@@ -12,8 +12,8 @@ test:
# add code quality tests here
# add unit tests here
# add script tests here
- bash tests/test_scripts_prepare_inputs_table/test.sh
# - bash tests/test_scripts_prepare_inputs_labkey/test.sh
#- bash tests/test_scripts_prepare_inputs_table/test.sh
#- bash tests/test_scripts_prepare_inputs_labkey/test.sh
#- bash tests/test_alfa/test.sh
# add integration tests here
- bash tests/test_integration_workflow_with_conda/test.local.sh
......
......@@ -137,12 +137,12 @@ or
bash tests/test_integration_workflow_with_conda/test.slurm.sh
```
> **NOTE:** Depending on the configuration of your Slurm installation or if
> using a different workload manager, you may need to adapt file `cluster.json`
> and the arguments to options `--config`, `--cores` and `--jobs` in the file
> `test.slurm.sh`, both located in directory `tests/test_integration_workflow`.
> **NOTE:** Depending on the configuration of your Slurm installation you may
> need to adapt file `slurm-config.json` (located directly under `profiles`
> directory) and the arguments to options `--cores` and `--jobs`
> in the file `config.yaml` of a respective profile.
> Consult the manual of your workload manager as well as the section of the
> Snakemake manual dealing with [cluster execution].
> Snakemake manual dealing with [profiles].
## Running the workflow on your own samples
......@@ -154,13 +154,11 @@ create a directory for your workflow run and traverse inside it with:
cd config/my_run
```
2. Create empty sample table, workflow configuration and, if necessary, cluster
configuration files:
2. Create an empty sample table and a workflow configuration file:
```bash
touch samples.tsv
touch config.yaml
touch cluster.json
```
3. Use your editor of choice to populate these files with appropriate
......@@ -169,12 +167,12 @@ files should look like, specifically:
- [samples.tsv](tests/input_files/samples.tsv)
- [config.yaml](tests/input_files/config.yaml)
- [cluster.json](tests/input_files/cluster.json)
4. Create a runner script. Pick one of the following choices for either local
or cluster execution. Before execution of the respective command, you must
replace the data directory placeholders in the argument of the
`--singularity-args` option with a comma-separated list of _all_ directories
or cluster execution. Before execution of the respective command, you need to
remember to update the argument of the `--singularity-args` option of a
respective profile (file: `profiles/{profile}/config.yaml`) so that
it contains a comma-separated list of _all_ directories
containing input data files (samples and any annoation files etc) required for
your run.
......@@ -183,21 +181,19 @@ your run.
```bash
cat << "EOF" > run.sh
#!/bin/bash
snakemake \
--snakefile="/path/to/Snakefile" \
--configfile="config.yaml" \
--cores=4 \
--printshellcmds \
--rerun-incomplete \
--use-singularity \
--singularity-args="--bind <data_dir_1>,<data_dir_2>,<data_dir_n>"
--profile="../profiles/local-singularity" \
--configfile="config.yaml"
EOF
```
**OR**
Runner script for _Slurm cluster exection_ (note that you may need
to modify the arguments to `--cluster` and `--cores` depending on your HPC
to modify the arguments to `--jobs` and `--cores` in the file:
`profiles/slurm-singularity/config.yaml` depending on your HPC
and workload manager configuration):
```bash
......@@ -205,20 +201,13 @@ your run.
#!/bin/bash
mkdir -p logs/cluster_log
snakemake \
--snakefile="/path/to/Snakefile" \
--configfile="config.yaml" \
--cluster-config="cluster.json" \
--cluster="sbatch --cpus-per-task={cluster.threads} --mem={cluster.mem} --qos={cluster.queue} --time={cluster.time} --job-name={cluster.name} -o {cluster.out} -p scicore" \
--cores=256 \
--jobs=256 \
--printshellcmds \
--rerun-incomplete \
--use-singularity \
--singularity-args="--bind <data_dir_1>,<data_dir_2>,<data_dir_n>"
--profile="../profiles/slurm-singularity" \
--configfile="config.yaml"
EOF
```
When running the pipeline with conda you should use the `--use-conda` flag instead of `--use-singularity` and `--singularity-args`.
When running the pipeline with *conda* you should use `local-conda` and
`slurm-conda` profiles instead.
5. Start your workflow run:
......@@ -335,7 +324,7 @@ Molecule | molecule
Contaminant sequences | contaminant_seqs
[conda]: <https://docs.conda.io/projects/conda/en/latest/index.html>
[cluster execution]: <https://snakemake.readthedocs.io/en/stable/executing/cluster-cloud.html#cluster-execution>
[profiles]: <https://snakemake.readthedocs.io/en/stable/executing/cli.html#profiles>
[labkey]: <https://www.labkey.com/>
[miniconda-installation]: <https://docs.conda.io/en/latest/miniconda.html>
[rule-graph]: images/rule_graph.svg
......
#
# Based on lsf CookieCutter.py
#
import os
import json
d = os.path.dirname(__file__)
with open(os.path.join(d, "slurm-settings.json")) as fh:
settings = json.load(fh)
class CookieCutter:
SBATCH_DEFAULTS = settings['SBATCH_DEFAULTS']
CLUSTER_NAME = settings['CLUSTER_NAME']
CLUSTER_CONFIG = settings['CLUSTER_CONFIG']
ADVANCED_ARGUMENT_CONVERSION = settings['ADVANCED_ARGUMENT_CONVERSION']
@staticmethod
def get_cluster_option() -> str:
cluster = CookieCutter.CLUSTER_NAME
if cluster != "":
return f"--cluster={cluster}"
return ""
@staticmethod
def get_advanced_argument_conversion() -> bool:
val = {"yes": True, "no": False}[
CookieCutter.ADVANCED_ARGUMENT_CONVERSION
]
return val
snakefile: "../../workflow/Snakefile"
printshellcmds: true
dryrun: true
verbose: true
notemp: true
no-hooks: true
snakefile: "../../workflow/Snakefile"
cores: 4
printshellcmds: true
rerun-incomplete: true
use-conda: true
notemp: true
no-hooks: true
verbose: true
snakefile: "../../workflow/Snakefile"
cores: 4
printshellcmds: true
rerun-incomplete: true
use-singularity: true
singularity-args: "--bind ./../input_files,./../../images"
notemp: true
no-hooks: true
verbose: true
jobscript: "../slurm-jobscript.sh"
cluster: "../slurm-submit.py"
cluster-status: "../slurm-status.py"
snakefile: "../../workflow/Snakefile"
cores: 256
jobs: 256
printshellcmds: true
rerun-incomplete: true
use-conda: true
notemp: true
no-hooks: true
verbose: true
{
"__default__" :
{
"queue": "6hours",
"qos": "6hours",
"time": "01:00:00",
"threads": "1",
"cpus-per-task": "1",
"mem": "4G",
"name": "{rule}.{wildcards}",
"out": "logs/cluster/{rule}.{wildcards}-%j-%N.out"
"job-name": "{rule}.{wildcards}",
"output": "{params.cluster_log_path}/{rule}.{wildcards}-%j-%N.out"
},
"create_index_star":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"12",
"cpus-per-task":"12",
"mem":"45G"
},
"extract_transcripts_as_bed12":
{
"qos": "30min",
"time": "00:30:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"1G"
},
"extract_transcriptome":
{
"qos": "30min",
"time": "00:30:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"1G"
},
"extract_decoys_salmon":
{
"qos": "30min",
"time": "00:30:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"10G"
},
"concatenate_transcriptome_and_genome":
{
"qos": "30min",
"time": "00:30:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"10G"
},
"create_index_salmon":
{
"qos": "6hours",
"time": "03:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"40G"
},
"sort_bed_4_big":
{
"qos": "6hours",
"time": "03:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"20G"
},
"create_index_kallisto":
{
"qos": "30min",
"time": "00:30:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"10G"
},
"index_genomic_alignment_samtools":
{
"qos": "30min",
"time": "00:30:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"500M"
},
"star_rpm":
{
"qos": "30min",
"time": "00:30:00",
"threads":"4",
"cpus-per-task":"4",
"mem":"15G"
},
"rename_star_rpm_for_alfa":
{
"qos": "6hours",
"time": "03:00:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"32G"
},
"calculate_TIN_scores":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"15G"
},
"merge_TIN_scores":
{
"qos": "30min",
"time": "00:05:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"500M"
},
"plot_TIN_scores":
{
"qos": "30min",
"time": "00:05:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"500M"
},
"salmon_quantmerge_genes":
{
"qos": "30min",
"time": "00:05:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"500M"
},
"salmon_quantmerge_transcripts":
{
"qos": "30min",
"time": "00:05:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"500M"
},
"generate_alfa_index":
{
"qos": "6hours",
"time": "02:00:00",
"threads":"4",
"cpus-per-task":"4",
"mem":"1G"
},
"alfa_qc":
{
"qos": "30min",
"time": "00:30:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"500M"
},
"alfa_qc_all_samples":
{
"qos": "6hours",
"time": "01:00:00",
"threads":"1",
"cpus-per-task":"1",
"mem":"500M"
},
"pe_fastqc":
{
"qos": "6hours",
"time": "01:00:00",
"threads":"2",
"cpus-per-task":"2",
"mem":"1G"
},
"fastqc":
{
"qos": "6hours",
"time": "01:00:00",
"threads":"2",
"cpus-per-task":"2",
"mem":"1G"
},
"pe_remove_adapters_cutadapt":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"1G"
},
"remove_adapters_cutadapt":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"1G"
},
"pe_remove_polya_cutadapt":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"1G"
},
"remove_polya_cutadapt":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"1G"
},
"pe_map_genome_star":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"12",
"cpus-per-task":"12",
"mem":"50G"
},
"map_genome_star":
{
"qos": "6hours",
"time": "06:00:00",
"threads":"12",
"cpus-per-task":"12",
"mem":"50G"
},
"pe_quantification_salmon":
{
"qos": "6hours",
"time": "03:00:00",
"threads":"6",
"cpus-per-task":"6",
"mem":"20G"
},
"quantification_salmon":
{
"qos": "6hours",
"time": "03:00:00",
"threads":"6",
"cpus-per-task":"6",
"mem":"20G"
},
"pe_genome_quantification_kallisto":
{
"qos": "6hours",
"time": "03:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"15G"
},
"genome_quantification_kallisto":
{
"qos": "6hours",
"time": "03:00:00",
"threads":"8",
"cpus-per-task":"8",
"mem":"15G"
}
}
#!/bin/bash
# properties = {properties}
{exec_job}
{
"SBATCH_DEFAULTS": "",
"CLUSTER_NAME": "",
"CLUSTER_CONFIG": "slurm-config.json",
"ADVANCED_ARGUMENT_CONVERSION": "no"
}
jobscript: "../slurm-jobscript.sh"
cluster: "../slurm-submit.py"
cluster-status: "../slurm-status.py"
snakefile: "../../workflow/Snakefile"
cores: 256
jobs: 256
printshellcmds: true
rerun-incomplete: true
use-singularity: true
singularity-args: "--bind ./../input_files,./../../images"
notemp: true
no-hooks: true
verbose: true
#!/usr/bin/env python3
import re
import subprocess as sp
import shlex
import sys
import time
import logging
from CookieCutterSlurm import CookieCutter
logger = logging.getLogger("__name__")
STATUS_ATTEMPTS = 20
jobid = sys.argv[1]
cluster = CookieCutter.get_cluster_option()
for i in range(STATUS_ATTEMPTS):
try:
sacct_res = sp.check_output(shlex.split(f"sacct {cluster} -P -b -j {jobid} -n"))
res = {
x.split("|")[0]: x.split("|")[1]
for x in sacct_res.decode().strip().split("\n")
}
break
except sp.CalledProcessError as e:
logger.error("sacct process error")
logger.error(e)
except IndexError as e:
logger.error(e)
pass
# Try getting job with scontrol instead in case sacct is misconfigured
try:
sctrl_res = sp.check_output(
shlex.split(f"scontrol {cluster} -o show job {jobid}")
)
m = re.search(r"JobState=(\w+)", sctrl_res.decode())
res = {jobid: m.group(1)}
break
except sp.CalledProcessError as e:
logger.error("scontrol process error")
logger.error(e)
if i >= STATUS_ATTEMPTS - 1:
print("failed")
exit(0)
else:
time.sleep(1)
status = res[jobid]
if status == "BOOT_FAIL":
print("failed")
elif status == "OUT_OF_MEMORY":
print("failed")
elif status.startswith("CANCELLED"):
print("failed")
elif status == "COMPLETED":
print("success")
elif status == "DEADLINE":
print("failed")
elif status == "FAILED":
print("failed")
elif status == "NODE_FAIL":
print("failed")
elif status == "PREEMPTED":
print("failed")
elif status == "TIMEOUT":
print("failed")
elif status == "SUSPENDED":
print("running")
else:
print("running")
#!/usr/bin/env python3
"""
Snakemake SLURM submit script.
"""
from snakemake.utils import read_job_properties
import slurm_utils
from CookieCutterSlurm import CookieCutter
# cookiecutter arguments
SBATCH_DEFAULTS = CookieCutter.SBATCH_DEFAULTS
CLUSTER = CookieCutter.get_cluster_option()
CLUSTER_CONFIG = CookieCutter.CLUSTER_CONFIG
ADVANCED_ARGUMENT_CONVERSION = CookieCutter.get_advanced_argument_conversion()
RESOURCE_MAPPING = {
"time": ("time", "runtime", "walltime"),
"mem": ("mem", "mem_mb", "ram", "memory"),
"mem-per-cpu": ("mem-per-cpu", "mem_per_cpu", "mem_per_thread"),
"nodes": ("nodes", "nnodes"),
}
# parse job
jobscript = slurm_utils.parse_jobscript()
job_properties = read_job_properties(jobscript)
sbatch_options = {}
cluster_config = slurm_utils.load_cluster_config(CLUSTER_CONFIG)
# 1) sbatch default arguments and cluster
sbatch_options.update(slurm_utils.parse_sbatch_defaults(SBATCH_DEFAULTS))
sbatch_options.update(slurm_utils.parse_sbatch_defaults(CLUSTER))
# 2) cluster_config defaults
sbatch_options.update(cluster_config["__default__"])
# 3) Convert resources (no unit conversion!) and threads
sbatch_options.update(
slurm_utils.convert_job_properties(job_properties, RESOURCE_MAPPING)
)
# 4) cluster_config for particular rule
sbatch_options.update(cluster_config.get(job_properties.get("rule"), {}))
# 5) cluster_config options
sbatch_options.update(job_properties.get("cluster", {}))
# 6) Advanced conversion of parameters
if ADVANCED_ARGUMENT_CONVERSION:
sbatch_options = slurm_utils.advanced_argument_conversion(sbatch_options)
# 7) Format pattern in snakemake style
sbatch_options = slurm_utils.format_values(sbatch_options, job_properties)
# ensure sbatch output dirs exist
for o in ("output", "error"):
slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None
# submit job and echo id back to Snakemake (must be the only stdout)
print(slurm_utils.submit_job(jobscript, **sbatch_options))
#!/usr/bin/env python3
import os
import sys
from os.path import dirname
import re
import math
import argparse
import subprocess as sp
from io import StringIO
from snakemake import io
from snakemake.io import Wildcards
from snakemake.utils import SequenceFormatter
from snakemake.utils import AlwaysQuotedFormatter
from snakemake.utils import QuotedFormatter
from snakemake.exceptions import WorkflowError
from snakemake.logging import logger
from CookieCutterSlurm import CookieCutter
def _convert_units_to_mb(memory):
"""If memory is specified with SI unit, convert to MB"""
if isinstance(memory, int) or isinstance(memory, float):
return int(memory)
siunits = {"K": 1e-3, "M": 1, "G": 1e3, "T": 1e6}
regex = re.compile(r"(\d+)({})$".format("|".join(siunits.keys())))
m = regex.match(memory)
if m is None:
logger.error(
(
f"unsupported memory specification '{memory}';"
" allowed suffixes: [K|M|G|T]"
)
)
sys.exit(1)
factor = siunits[m.group(2)]
return int(int(m.group(1)) * factor)
def parse_jobscript():
"""Minimal CLI to require/only accept single positional argument."""
p = argparse.ArgumentParser(description="SLURM snakemake submit script")
p.add_argument("jobscript", help="Snakemake jobscript with job properties.")
return p.parse_args().jobscript
def parse_sbatch_defaults(parsed):
"""Unpack SBATCH_DEFAULTS."""
d = parsed.split() if type(parsed) == str else parsed
args = {}
for keyval in [a.split("=") for a in d]:
k = keyval[0].strip().strip("-")
v = keyval[1].strip() if len(keyval) == 2 else None
args[k] = v
return args
def load_cluster_config(path):
"""Load config to dict
Load configuration to dict either from absolute path or relative
to profile dir.
"""
if path:
path = os.path.join(dirname(__file__), os.path.expandvars(path))
dcc = io.load_configfile(path)
else:
dcc = {}
if "__default__" not in dcc:
dcc["__default__"] = {}
return dcc
# adapted from format function in snakemake.utils
def format(_pattern, _quote_all=False, **kwargs): # noqa: A001
"""Format a pattern in Snakemake style.
This means that keywords embedded in braces are replaced by any variable
values that are available in the current namespace.
"""
fmt = SequenceFormatter(separator=" ")
if _quote_all:
fmt.element_formatter = AlwaysQuotedFormatter()
else:
fmt.element_formatter = QuotedFormatter()
try:
return fmt.format(_pattern, **kwargs)
except KeyError as ex:
raise NameError(
f"The name {ex} is unknown in this context. Please "
"make sure that you defined that variable. "
"Also note that braces not used for variable access "
"have to be escaped by repeating them "
)
# adapted from Job.format_wildcards in snakemake.jobs
def format_wildcards(string, job_properties):
""" Format a string with variables from the job. """
class Job(object):
def __init__(self, job_properties):
for key in job_properties:
setattr(self, key, job_properties[key])
job = Job(job_properties)
if "params" in job_properties:
job._format_params = Wildcards(fromdict=job_properties["params"])
else:
job._format_params = None
if "wildcards" in job_properties:
job._format_wildcards = Wildcards(fromdict=job_properties["wildcards"])
else:
job._format_wildcards = None
_variables = dict()
_variables.update(
dict(params=job._format_params, wildcards=job._format_wildcards)
)
if hasattr(job, "rule"):
_variables.update(dict(rule=job.rule))
try:
return format(string, **_variables)
except NameError as ex:
raise WorkflowError(
"NameError with group job {}: {}".format(job.jobid, str(ex))
)
except IndexError as ex:
raise WorkflowError(
"IndexError with group job {}: {}".format(job.jobid, str(ex))
)
# adapted from ClusterExecutor.cluster_params function in snakemake.executor
def format_values(dictionary, job_properties):
formatted = dictionary.copy()
for key, value in list(formatted.items()):
if key == "mem":
value = str(_convert_units_to_mb(value))
if isinstance(value, str):
try:
formatted[key] = format_wildcards(value, job_properties)
except NameError as e:
msg = "Failed to format cluster config " "entry for job {}.".format(
job_properties["rule"]
)
raise WorkflowError(msg, e)
return formatted
def convert_job_properties(job_properties, resource_mapping=None):
options = {}
if resource_mapping is None:
resource_mapping = {}
resources = job_properties.get("resources", {})
for k, v in resource_mapping.items():
options.update({k: resources[i] for i in v if i in resources})
if "threads" in job_properties:
options["cpus-per-task"] = job_properties["threads"]
return options
def ensure_dirs_exist(path):
"""Ensure output folder for Slurm log files exist."""
di = dirname(path)
if di == "":
return
if not os.path.exists(di):
os.makedirs(di, exist_ok=True)
return
def format_sbatch_options(**sbatch_options):
"""Format sbatch options"""
options = []
for k, v in sbatch_options.items():
val = ""
if v is not None:
val = f"={v}"
options.append(f"--{k}{val}")
return options
def submit_job(jobscript, **sbatch_options):
"""Submit jobscript and return jobid."""
options = format_sbatch_options(**sbatch_options)
try:
cmd = ["sbatch"] + ["--parsable"] + options + [jobscript]
res = sp.check_output(cmd)
except sp.CalledProcessError as e:
raise e
# Get jobid
res = res.decode()
try:
jobid = re.search(r"(\d+)", res).group(1)
except Exception as e:
raise e
return jobid
def advanced_argument_conversion(arg_dict):
"""Experimental adjustment of sbatch arguments to the given or default partition."""
# Currently not adjusting for multiple node jobs
nodes = int(arg_dict.get("nodes", 1))
if nodes > 1:
return arg_dict
partition = arg_dict.get("partition", None) or _get_default_partition()
constraint = arg_dict.get("constraint", None)
ncpus = int(arg_dict.get("cpus-per-task", 1))
runtime = arg_dict.get("time", None)
memory = _convert_units_to_mb(arg_dict.get("mem", 0))
config = _get_cluster_configuration(partition, constraint, memory)
mem = arg_dict.get("mem", ncpus * min(config["MEMORY_PER_CPU"]))
mem = _convert_units_to_mb(mem)
if mem > max(config["MEMORY"]):
logger.info(
f"requested memory ({mem}) > max memory ({max(config['MEMORY'])}); "
"adjusting memory settings"
)
mem = max(config["MEMORY"])
# Calculate available memory as defined by the number of requested
# cpus times memory per cpu
AVAILABLE_MEM = ncpus * min(config["MEMORY_PER_CPU"])
# Add additional cpus if memory is larger than AVAILABLE_MEM
if mem > AVAILABLE_MEM:
logger.info(
f"requested memory ({mem}) > "
f"ncpus x MEMORY_PER_CPU ({AVAILABLE_MEM}); "
"trying to adjust number of cpus up"
)
ncpus = int(math.ceil(mem / min(config["MEMORY_PER_CPU"])))
if ncpus > max(config["CPUS"]):
logger.info(
f"ncpus ({ncpus}) > available cpus ({max(config['CPUS'])}); "
"adjusting number of cpus down"
)
ncpus = min(int(max(config["CPUS"])), ncpus)
adjusted_args = {"mem": int(mem), "cpus-per-task": ncpus}
# Update time. If requested time is larger than maximum allowed time, reset
if runtime:
runtime = time_to_minutes(runtime)
time_limit = max(config["TIMELIMIT_MINUTES"])
if runtime > time_limit:
logger.info(
f"time (runtime) > time limit {time_limit}; " "adjusting time down"
)
adjusted_args["time"] = time_limit
# update and return
arg_dict.update(adjusted_args)
return arg_dict
timeformats = [
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+)$"),
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+)$"),
re.compile(r"^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<minutes>\d+)$"),
]
def time_to_minutes(time):
"""Convert time string to minutes.
According to slurm:
Acceptable time formats include "minutes", "minutes:seconds",
"hours:minutes:seconds", "days-hours", "days-hours:minutes"
and "days-hours:minutes:seconds".
"""
if not isinstance(time, str):
time = str(time)
d = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0}
regex = list(filter(lambda regex: regex.match(time) is not None, timeformats))
if len(regex) == 0:
return
assert len(regex) == 1, "multiple time formats match"
m = regex[0].match(time)
d.update(m.groupdict())
minutes = (
int(d["days"]) * 24 * 60
+ int(d["hours"]) * 60
+ int(d["minutes"])
+ math.ceil(int(d["seconds"]) / 60)
)
assert minutes > 0, "minutes has to be greater than 0"
return minutes
def _get_default_partition():
"""Retrieve default partition for cluster"""
cluster = CookieCutter.get_cluster_option()
cmd = f"sinfo -O partition {cluster}"
res = sp.check_output(cmd.split())
m = re.search(r"(?P<partition>\S+)\*", res.decode(), re.M)
partition = m.group("partition")
return partition
def _get_cluster_configuration(partition, constraints=None, memory=0):
"""Retrieve cluster configuration.
Retrieve cluster configuration for a partition filtered by
constraints, memory and cpus
"""
try:
import pandas as pd
except ImportError:
print(
"Error: currently advanced argument conversion "
"depends on 'pandas'.", file=sys.stderr
)
sys.exit(1)
if constraints:
constraint_set = set(constraints.split(","))
cluster = CookieCutter.get_cluster_option()
cmd = f"sinfo -e -o %all -p {partition} {cluster}".split()
try:
output = sp.Popen(" ".join(cmd), shell=True, stdout=sp.PIPE).communicate()
except Exception as e:
print(e)
raise
data = re.sub("^CLUSTER:.+\n", "", re.sub(" \\|", "|", output[0].decode()))
df = pd.read_csv(StringIO(data), sep="|")
try:
df["TIMELIMIT_MINUTES"] = df["TIMELIMIT"].apply(time_to_minutes)
df["MEMORY_PER_CPU"] = df["MEMORY"] / df["CPUS"]
df["FEATURE_SET"] = df["AVAIL_FEATURES"].str.split(",").apply(set)
except Exception as e:
print(e)
raise
if constraints:
constraint_set = set(constraints.split(","))
i = df["FEATURE_SET"].apply(lambda x: len(x.intersection(constraint_set)) > 0)
df = df.loc[i]
memory = min(_convert_units_to_mb(memory), max(df["MEMORY"]))
df = df.loc[df["MEMORY"] >= memory]
return df
......@@ -17,6 +17,10 @@
"type": "string",
"description": "Path to log directory."
},
"cluster_log_dir": {
"type": "string",
"description": "Path to cluster log directory."
},
"kallisto_indexes": {
"type": "string",
"description": "Path to kallisto indexes directory."
......
......@@ -2,6 +2,7 @@
samples: "../input_files/samples.multiple_lanes.tsv"
output_dir: "results"
log_dir: "logs"
cluster_log_dir: "logs/cluster"
kallisto_indexes: "results/kallisto_indexes"
salmon_indexes: "results/salmon_indexes"
star_indexes: "results/star_indexes"
......
......@@ -3,6 +3,7 @@
samples: "../input_files/samples.tsv"
output_dir: "results"
log_dir: "logs"
cluster_log_dir: "logs/cluster"
kallisto_indexes: "results/kallisto_indexes"
salmon_indexes: "results/salmon_indexes"
star_indexes: "results/star_indexes"
......
......@@ -20,13 +20,7 @@ cd $script_dir
# Run tests
snakemake \
--snakefile="../../workflow/Snakefile" \
--profile="../../profiles/graphs" \
--configfile="../input_files/config.yaml" \
--dag \
--printshellcmds \
--dryrun \
--verbose \
--notemp \
--no-hooks \
| dot -Tsvg > "../../images/dag_test_workflow.svg"
......@@ -20,13 +20,7 @@ cd $script_dir
# Run tests
snakemake \
--snakefile="../../workflow/Snakefile" \
--profile="../../profiles/graphs" \
--configfile="../input_files/config.yaml" \
--rulegraph \
--printshellcmds \
--dryrun \
--verbose \
--notemp \
--no-hooks \
| dot -Tsvg > "../../images/rule_graph.svg"
......@@ -26,16 +26,8 @@ cd $script_dir
# Run tests
snakemake \
--snakefile="../../workflow/Snakefile" \
--configfile="../input_files/config.yaml" \
--cores=4 \
--printshellcmds \
--rerun-incomplete \
--use-singularity \
--singularity-args="--bind ${PWD}/../input_files,${PWD}/../../images" \
--notemp \
--no-hooks \
--verbose
--profile="../../profiles/local-singularity" \
--configfile="../input_files/config.yaml"
# Create a Snakemake report after the workflow execution
snakemake \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment