__author__ = "Johannes Köster"
__copyright__ = "Copyright 2015, Johannes Köster"
__email__ = "koester@jimmy.harvard.edu"
__license__ = "MIT"
import os
import subprocess
import glob
from argparse import ArgumentError
import logging as _logging
import re
import sys
import inspect
import threading
import webbrowser
from functools import partial
import importlib
import shutil
from snakemake.workflow import Workflow
from snakemake.exceptions import print_exception, WorkflowError
from snakemake.logging import setup_logger, logger
from snakemake.io import load_configfile
from snakemake.shell import shell
from snakemake.utils import update_config, available_cpu_count
from snakemake.common import Mode, __version__
[docs]def snakemake(snakefile,
report=None,
listrules=False,
list_target_rules=False,
cores=1,
nodes=1,
local_cores=1,
resources=dict(),
config=dict(),
configfile=None,
config_args=None,
workdir=None,
targets=None,
dryrun=False,
touch=False,
forcetargets=False,
forceall=False,
forcerun=[],
until=[],
omit_from=[],
prioritytargets=[],
stats=None,
printreason=False,
printshellcmds=False,
debug_dag=False,
printdag=False,
printrulegraph=False,
printd3dag=False,
nocolor=False,
quiet=False,
keepgoing=False,
cluster=None,
cluster_config=None,
cluster_sync=None,
drmaa=None,
drmaa_log_dir=None,
jobname="snakejob.{rulename}.{jobid}.sh",
immediate_submit=False,
standalone=False,
ignore_ambiguity=False,
snakemakepath=None,
lock=True,
unlock=False,
cleanup_metadata=None,
cleanup_conda=False,
cleanup_shadow=False,
force_incomplete=False,
ignore_incomplete=False,
list_version_changes=False,
list_code_changes=False,
list_input_changes=False,
list_params_changes=False,
list_untracked=False,
list_resources=False,
summary=False,
archive=None,
delete_all_output=False,
delete_temp_output=False,
detailed_summary=False,
latency_wait=3,
wait_for_files=None,
print_compilation=False,
debug=False,
notemp=False,
keep_remote_local=False,
nodeps=False,
keep_target_files=False,
allowed_rules=None,
jobscript=None,
greediness=None,
no_hooks=False,
overwrite_shellcmd=None,
updated_files=None,
log_handler=None,
keep_logger=False,
max_jobs_per_second=None,
max_status_checks_per_second=100,
restart_times=0,
attempt=1,
verbose=False,
force_use_threads=False,
use_conda=False,
use_singularity=False,
singularity_args="",
conda_prefix=None,
list_conda_envs=False,
singularity_prefix=None,
shadow_prefix=None,
create_envs_only=False,
mode=Mode.default,
wrapper_prefix=None,
kubernetes=None,
kubernetes_envvars=None,
container_image=None,
default_remote_provider=None,
default_remote_prefix="",
assume_shared_fs=True,
cluster_status=None,
export_cwl=None):
"""Run snakemake on a given snakefile.
This function provides access to the whole snakemake functionality. It is not thread-safe.
Args:
snakefile (str): the path to the snakefile
report (str): create an HTML report for a previous run at the given path
listrules (bool): list rules (default False)
list_target_rules (bool): list target rules (default False)
cores (int): the number of provided cores (ignored when using cluster support) (default 1)
nodes (int): the number of provided cluster nodes (ignored without cluster support) (default 1)
local_cores (int): the number of provided local cores if in cluster mode (ignored without cluster support) (default 1)
resources (dict): provided resources, a dictionary assigning integers to resource names, e.g. {gpu=1, io=5} (default {})
config (dict): override values for workflow config
workdir (str): path to working directory (default None)
targets (list): list of targets, e.g. rule or file names (default None)
dryrun (bool): only dry-run the workflow (default False)
touch (bool): only touch all output files if present (default False)
forcetargets (bool): force given targets to be re-created (default False)
forceall (bool): force all output files to be re-created (default False)
forcerun (list): list of files and rules that shall be re-created/re-executed (default [])
prioritytargets (list): list of targets that shall be run with maximum priority (default [])
stats (str): path to file that shall contain stats about the workflow execution (default None)
printreason (bool): print the reason for the execution of each job (default false)
printshellcmds (bool): print the shell command of each job (default False)
printdag (bool): print the dag in the graphviz dot language (default False)
printrulegraph (bool): print the graph of rules in the graphviz dot language (default False)
printd3dag (bool): print a D3.js compatible JSON representation of the DAG (default False)
nocolor (bool): do not print colored output (default False)
quiet (bool): do not print any default job information (default False)
keepgoing (bool): keep goind upon errors (default False)
cluster (str): submission command of a cluster or batch system to use, e.g. qsub (default None)
cluster_config (str,list): configuration file for cluster options, or list thereof (default None)
cluster_sync (str): blocking cluster submission command (like SGE 'qsub -sync y') (default None)
drmaa (str): if not None use DRMAA for cluster support, str specifies native args passed to the cluster when submitting a job
drmaa_log_dir (str): the path to stdout and stderr output of DRMAA jobs (default None)
jobname (str): naming scheme for cluster job scripts (default "snakejob.{rulename}.{jobid}.sh")
immediate_submit (bool): immediately submit all cluster jobs, regardless of dependencies (default False)
standalone (bool): kill all processes very rudely in case of failure (do not use this if you use this API) (default False) (deprecated)
ignore_ambiguity (bool): ignore ambiguous rules and always take the first possible one (default False)
snakemakepath (str): deprecated parameter whose value is ignored. Do not use.
lock (bool): lock the working directory when executing the workflow (default True)
unlock (bool): just unlock the working directory (default False)
cleanup_metadata (list): just cleanup metadata of given list of output files (default None)
cleanup_conda (bool): just cleanup unused conda environments (default False)
cleanup_shadow (bool): just cleanup old shadow directories (default False)
force_incomplete (bool): force the re-creation of incomplete files (default False)
ignore_incomplete (bool): ignore incomplete files (default False)
list_version_changes (bool): list output files with changed rule version (default False)
list_code_changes (bool): list output files with changed rule code (default False)
list_input_changes (bool): list output files with changed input files (default False)
list_params_changes (bool): list output files with changed params (default False)
list_untracked (bool): list files in the workdir that are not used in the workflow (default False)
summary (bool): list summary of all output files and their status (default False)
archive (str): archive workflow into the given tarball
delete_all_output (bool) remove all files generated by the workflow (default False)
delete_temp_output (bool) remove all temporary files generated by the workflow (default False)
latency_wait (int): how many seconds to wait for an output file to appear after the execution of a job, e.g. to handle filesystem latency (default 3)
wait_for_files (list): wait for given files to be present before executing the workflow
list_resources (bool): list resources used in the workflow (default False)
summary (bool): list summary of all output files and their status (default False). If no option is specified a basic summary will be ouput. If 'detailed' is added as an option e.g --summary detailed, extra info about the input and shell commands will be included
detailed_summary (bool): list summary of all input and output files and their status (default False)
print_compilation (bool): print the compilation of the snakefile (default False)
debug (bool): allow to use the debugger within rules
notemp (bool): ignore temp file flags, e.g. do not delete output files marked as temp after use (default False)
keep_remote_local (bool): keep local copies of remote files (default False)
nodeps (bool): ignore dependencies (default False)
keep_target_files (bool): do not adjust the paths of given target files relative to the working directory.
allowed_rules (set): restrict allowed rules to the given set. If None or empty, all rules are used.
jobscript (str): path to a custom shell script template for cluster jobs (default None)
greediness (float): set the greediness of scheduling. This value between 0 and 1 determines how careful jobs are selected for execution. The default value (0.5 if prioritytargets are used, 1.0 else) provides the best speed and still acceptable scheduling quality.
overwrite_shellcmd (str): a shell command that shall be executed instead of those given in the workflow. This is for debugging purposes only.
updated_files(list): a list that will be filled with the files that are updated or created during the workflow execution
verbose (bool): show additional debug output (default False)
max_jobs_per_second (int): maximal number of cluster/drmaa jobs per second, None to impose no limit (default None)
restart_times (int): number of times to restart failing jobs (default 0)
attempt (int): initial value of Job.attempt. This is intended for internal use only (default 1).
force_use_threads: whether to force use of threads over processes. helpful if shared memory is full or unavailable (default False)
use_conda (bool): create conda environments for each job (defined with conda directive of rules)
use_singularity (bool): run jobs in singularity containers (if defined with singularity directive)
singularity_args (str): additional arguments to pass to singularity
conda_prefix (str): the directory in which conda environments will be created (default None)
singularity_prefix (str): the directory to which singularity images will be pulled (default None)
shadow_prefix (str): prefix for shadow directories. The job-specific shadow directories will be created in $SHADOW_PREFIX/shadow/ (default None)
create_envs_only (bool): if specified, only builds the conda environments specified for each job, then exits.
list_conda_envs (bool): list conda environments and their location on disk.
mode (snakemake.common.Mode): execution mode
wrapper_prefix (str): prefix for wrapper script URLs (default None)
kubernetes (str): submit jobs to kubernetes, using the given namespace.
kubernetes_env (list): environment variables that shall be passed to kubernetes jobs.
container_image (str): Docker image to use, e.g., for kubernetes.
default_remote_provider (str): default remote provider to use instead of local files (e.g. S3, GS)
default_remote_prefix (str): prefix for default remote provider (e.g. name of the bucket).
assume_shared_fs (bool): assume that cluster nodes share a common filesystem (default true).
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as single argument.
export_cwl (str): Compile workflow to CWL and save to given file
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
:level:
the log level ("info", "error", "debug", "progress", "job_info")
:level="info", "error" or "debug":
:msg:
the log message
:level="progress":
:done:
number of already executed jobs
:total:
number of total jobs
:level="job_info":
:input:
list of input files of a job
:output:
list of output files of a job
:log:
path to log file of a job
:local:
whether a job is executed locally (i.e. ignoring cluster)
:msg:
the job message
:reason:
the job reason
:priority:
the job priority
:threads:
the threads of the job
Returns:
bool: True if workflow execution was successful.
"""
assert not immediate_submit or (immediate_submit and notemp), "immediate_submit has to be combined with notemp (it does not support temp file handling)"
if updated_files is None:
updated_files = list()
if cluster or cluster_sync or drmaa:
cores = sys.maxsize
else:
nodes = sys.maxsize
if isinstance(cluster_config, str):
# Loading configuration from one file is still supported for
# backward compatibility
cluster_config = [cluster_config]
if cluster_config:
# Load all configuration files
configs = [load_configfile(f) for f in cluster_config]
# Merge in the order as specified, overriding earlier values with
# later ones
cluster_config_content = configs[0]
for other in configs[1:]:
update_config(cluster_config_content, other)
else:
cluster_config_content = dict()
run_local = not (cluster or cluster_sync or drmaa or kubernetes)
if run_local and not dryrun:
# clean up all previously recorded jobids.
shell.cleanup()
# force thread use for any kind of cluster
use_threads = force_use_threads or (os.name != "posix") or cluster or cluster_sync or drmaa
if not keep_logger:
stdout = (
(dryrun and not (printdag or printd3dag or printrulegraph)) or
listrules or list_target_rules or list_resources
)
setup_logger(handler=log_handler,
quiet=quiet,
printreason=printreason,
printshellcmds=printshellcmds,
debug_dag=debug_dag,
nocolor=nocolor,
stdout=stdout,
debug=verbose,
use_threads=use_threads,
mode=mode)
if greediness is None:
greediness = 0.5 if prioritytargets else 1.0
else:
if not (0 <= greediness <= 1.0):
logger.error("Error: greediness must be a float between 0 and 1.")
return False
if not os.path.exists(snakefile):
logger.error("Error: Snakefile \"{}\" not present.".format(snakefile))
return False
snakefile = os.path.abspath(snakefile)
cluster_mode = (cluster is not None) + (cluster_sync is not
None) + (drmaa is not None)
if cluster_mode > 1:
logger.error("Error: cluster and drmaa args are mutually exclusive")
return False
if debug and (cores > 1 or cluster_mode):
logger.error(
"Error: debug mode cannot be used with more than one core or cluster execution.")
return False
overwrite_config = dict()
if configfile:
overwrite_config.update(load_configfile(configfile))
configfile = os.path.abspath(configfile)
if config:
overwrite_config.update(config)
if config_args is None:
config_args = unparse_config(config)
if workdir:
olddir = os.getcwd()
if not os.path.exists(workdir):
logger.info(
"Creating specified working directory {}.".format(workdir))
os.makedirs(workdir)
workdir = os.path.abspath(workdir)
os.chdir(workdir)
logger.setup_logfile()
try:
# handle default remote provider
_default_remote_provider = None
if default_remote_provider is not None:
try:
rmt = importlib.import_module("snakemake.remote." +
default_remote_provider)
except ImportError as e:
raise WorkflowError("Unknown default remote provider.")
if rmt.RemoteProvider.supports_default:
_default_remote_provider = rmt.RemoteProvider()
else:
raise WorkflowError("Remote provider {} does not (yet) support to "
"be used as default provider.")
workflow = Workflow(snakefile=snakefile,
jobscript=jobscript,
overwrite_shellcmd=overwrite_shellcmd,
overwrite_config=overwrite_config,
overwrite_workdir=workdir,
overwrite_configfile=configfile,
overwrite_clusterconfig=cluster_config_content,
config_args=config_args,
debug=debug,
use_conda=use_conda or list_conda_envs or cleanup_conda,
use_singularity=use_singularity,
conda_prefix=conda_prefix,
singularity_prefix=singularity_prefix,
shadow_prefix=shadow_prefix,
singularity_args=singularity_args,
mode=mode,
wrapper_prefix=wrapper_prefix,
printshellcmds=printshellcmds,
restart_times=restart_times,
attempt=attempt,
default_remote_provider=_default_remote_provider,
default_remote_prefix=default_remote_prefix,
run_local=run_local)
success = True
workflow.include(snakefile,
overwrite_first_rule=True,
print_compilation=print_compilation)
workflow.check()
if not print_compilation:
if listrules:
workflow.list_rules()
elif list_target_rules:
workflow.list_rules(only_targets=True)
elif list_resources:
workflow.list_resources()
else:
# if not printdag and not printrulegraph:
# handle subworkflows
subsnakemake = partial(snakemake,
cores=cores,
nodes=nodes,
local_cores=local_cores,
resources=resources,
dryrun=dryrun,
touch=touch,
printreason=printreason,
printshellcmds=printshellcmds,
debug_dag=debug_dag,
nocolor=nocolor,
quiet=quiet,
keepgoing=keepgoing,
cluster=cluster,
cluster_sync=cluster_sync,
drmaa=drmaa,
drmaa_log_dir=drmaa_log_dir,
jobname=jobname,
immediate_submit=immediate_submit,
standalone=standalone,
ignore_ambiguity=ignore_ambiguity,
restart_times=restart_times,
attempt=attempt,
lock=lock,
unlock=unlock,
cleanup_metadata=cleanup_metadata,
cleanup_conda=cleanup_conda,
cleanup_shadow=cleanup_shadow,
force_incomplete=force_incomplete,
ignore_incomplete=ignore_incomplete,
latency_wait=latency_wait,
verbose=verbose,
notemp=notemp,
keep_remote_local=keep_remote_local,
nodeps=nodeps,
jobscript=jobscript,
greediness=greediness,
no_hooks=no_hooks,
overwrite_shellcmd=overwrite_shellcmd,
config=config,
config_args=config_args,
cluster_config=cluster_config,
keep_logger=True,
force_use_threads=use_threads,
use_conda=use_conda,
use_singularity=use_singularity,
conda_prefix=conda_prefix,
singularity_prefix=singularity_prefix,
shadow_prefix=shadow_prefix,
singularity_args=singularity_args,
list_conda_envs=list_conda_envs,
kubernetes=kubernetes,
kubernetes_envvars=kubernetes_envvars,
container_image=container_image,
create_envs_only=create_envs_only,
default_remote_provider=default_remote_provider,
default_remote_prefix=default_remote_prefix,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second)
success = workflow.execute(
targets=targets,
dryrun=dryrun,
touch=touch,
cores=cores,
nodes=nodes,
local_cores=local_cores,
forcetargets=forcetargets,
forceall=forceall,
forcerun=forcerun,
prioritytargets=prioritytargets,
until=until,
omit_from=omit_from,
quiet=quiet,
keepgoing=keepgoing,
printshellcmds=printshellcmds,
printreason=printreason,
printrulegraph=printrulegraph,
printdag=printdag,
cluster=cluster,
cluster_sync=cluster_sync,
jobname=jobname,
drmaa=drmaa,
drmaa_log_dir=drmaa_log_dir,
kubernetes=kubernetes,
kubernetes_envvars=kubernetes_envvars,
container_image=container_image,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
printd3dag=printd3dag,
immediate_submit=immediate_submit,
ignore_ambiguity=ignore_ambiguity,
stats=stats,
force_incomplete=force_incomplete,
ignore_incomplete=ignore_incomplete,
list_version_changes=list_version_changes,
list_code_changes=list_code_changes,
list_input_changes=list_input_changes,
list_params_changes=list_params_changes,
list_untracked=list_untracked,
list_conda_envs=list_conda_envs,
summary=summary,
archive=archive,
delete_all_output=delete_all_output,
delete_temp_output=delete_temp_output,
latency_wait=latency_wait,
wait_for_files=wait_for_files,
detailed_summary=detailed_summary,
nolock=not lock,
unlock=unlock,
resources=resources,
notemp=notemp,
keep_remote_local=keep_remote_local,
nodeps=nodeps,
keep_target_files=keep_target_files,
cleanup_metadata=cleanup_metadata,
cleanup_conda=cleanup_conda,
cleanup_shadow=cleanup_shadow,
subsnakemake=subsnakemake,
updated_files=updated_files,
allowed_rules=allowed_rules,
greediness=greediness,
no_hooks=no_hooks,
force_use_threads=use_threads,
create_envs_only=create_envs_only,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
report=report,
export_cwl=export_cwl)
except BrokenPipeError:
# ignore this exception and stop. It occurs if snakemake output is piped into less and less quits before reading the whole output.
# in such a case, snakemake shall stop scheduling and quit with error 1
success = False
except (Exception, BaseException) as ex:
if "workflow" in locals():
print_exception(ex, workflow.linemaps)
else:
print_exception(ex, dict())
success = False
if workdir:
os.chdir(olddir)
if "workflow" in locals() and workflow.persistence:
workflow.persistence.unlock()
if not keep_logger:
logger.cleanup()
return success
def parse_resources(args):
"""Parse resources from args."""
resources = dict()
if args.resources is not None:
valid = re.compile("[a-zA-Z_]\w*$")
for res in args.resources:
try:
res, val = res.split("=")
except ValueError:
raise ValueError(
"Resources have to be defined as name=value pairs.")
if not valid.match(res):
raise ValueError(
"Resource definition must start with a valid identifier.")
try:
val = int(val)
except ValueError:
raise ValueError(
"Resource definiton must contain an integer after the identifier.")
if res == "_cores":
raise ValueError(
"Resource _cores is already defined internally. Use a different name.")
resources[res] = val
return resources
def parse_config(args):
"""Parse config from args."""
parsers = [int, float, eval, str]
config = dict()
if args.config is not None:
valid = re.compile("[a-zA-Z_]\w*$")
for entry in args.config:
try:
key, val = entry.split("=", 1)
except ValueError:
raise ValueError(
"Config entries have to be defined as name=value pairs.")
if not valid.match(key):
raise ValueError(
"Config entry must start with a valid identifier.")
v = None
for parser in parsers:
try:
v = parser(val)
# avoid accidental interpretation as function
if not callable(v):
break
except:
pass
assert v is not None
config[key] = v
return config
def unparse_config(config):
if not isinstance(config, dict):
raise ValueError("config is not a dict")
items = []
for key, value in config.items():
if isinstance(value, dict):
raise ValueError("config may only be a flat dict")
encoded = "'{}'".format(value) if isinstance(value, str) else value
items.append("{}={}".format(key, encoded))
return items
APPDIRS = None
def get_appdirs():
global APPDIRS
if APPDIRS is None:
from appdirs import AppDirs
APPDIRS = AppDirs("snakemake", "snakemake")
return APPDIRS
def get_profile_file(profile, file, return_default=False):
dirs = get_appdirs()
if os.path.isabs(profile):
search_dirs = [os.path.dirname(profile)]
profile = os.path.basename(profile)
else:
search_dirs = [os.getcwd(),
dirs.user_config_dir,
dirs.site_config_dir]
get_path = lambda d: os.path.join(d, profile, file)
for d in search_dirs:
p = get_path(d)
if os.path.exists(p):
return p
if return_default:
return file
return None
def get_argument_parser(profile=None):
"""Generate and return argument parser."""
import configargparse
from configargparse import YAMLConfigFileParser
dirs = get_appdirs()
config_files = []
if profile:
if profile == "":
print("Error: invalid profile name.", file=sys.stderr)
exit(1)
config_file = get_profile_file(profile, "config.yaml")
if config_file is None:
print("Error: profile given but no config.yaml found. "
"Profile has to be given as either absolute path, relative "
"path or name of a directory available in either "
"{site} or {user}.".format(
site=dirs.site_config_dir,
user=dirs.user_config_dir), file=sys.stderr)
exit(1)
config_files = [config_file]
parser = configargparse.ArgumentParser(
description="Snakemake is a Python based language and execution "
"environment for GNU Make-like workflows.",
default_config_files=config_files,
config_file_parser_class=YAMLConfigFileParser)
group_exec = parser.add_argument_group("EXECUTION")
group_exec.add_argument("target",
nargs="*",
default=None,
help="Targets to build. May be rules or files.")
group_exec.add_argument("--dryrun", "-n",
action="store_true",
help="Do not execute anything, and display what would be done. "
"If you have a very large workflow, use --dryrun --quiet to just "
"print a summary of the DAG of jobs.")
group_exec.add_argument("--profile",
help="""
Name of profile to use for configuring
Snakemake. Snakemake will search for a corresponding
folder in {} and {}. Alternatively, this can be an
absolute or relative path.
The profile folder has to contain a file 'config.yaml'.
This file can be used to set default values for command
line options in YAML format. For example,
'--cluster qsub' becomes 'cluster: qsub' in the YAML
file. Profiles can be obtained from