Source code for snakemake

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2015, Johannes Köster"
__email__ = "koester@jimmy.harvard.edu"
__license__ = "MIT"

import os
import subprocess
import glob
from argparse import ArgumentError
import logging as _logging
import re
import sys
import inspect
import threading
import webbrowser
from functools import partial
import importlib
import shutil

from snakemake.workflow import Workflow
from snakemake.exceptions import print_exception, WorkflowError
from snakemake.logging import setup_logger, logger
from snakemake.io import load_configfile
from snakemake.shell import shell
from snakemake.utils import update_config, available_cpu_count
from snakemake.common import Mode, __version__


[docs]def snakemake(snakefile, report=None, listrules=False, list_target_rules=False, cores=1, nodes=1, local_cores=1, resources=dict(), config=dict(), configfile=None, config_args=None, workdir=None, targets=None, dryrun=False, touch=False, forcetargets=False, forceall=False, forcerun=[], until=[], omit_from=[], prioritytargets=[], stats=None, printreason=False, printshellcmds=False, debug_dag=False, printdag=False, printrulegraph=False, printd3dag=False, nocolor=False, quiet=False, keepgoing=False, cluster=None, cluster_config=None, cluster_sync=None, drmaa=None, drmaa_log_dir=None, jobname="snakejob.{rulename}.{jobid}.sh", immediate_submit=False, standalone=False, ignore_ambiguity=False, snakemakepath=None, lock=True, unlock=False, cleanup_metadata=None, cleanup_conda=False, cleanup_shadow=False, force_incomplete=False, ignore_incomplete=False, list_version_changes=False, list_code_changes=False, list_input_changes=False, list_params_changes=False, list_untracked=False, list_resources=False, summary=False, archive=None, delete_all_output=False, delete_temp_output=False, detailed_summary=False, latency_wait=3, wait_for_files=None, print_compilation=False, debug=False, notemp=False, keep_remote_local=False, nodeps=False, keep_target_files=False, allowed_rules=None, jobscript=None, greediness=None, no_hooks=False, overwrite_shellcmd=None, updated_files=None, log_handler=None, keep_logger=False, max_jobs_per_second=None, max_status_checks_per_second=100, restart_times=0, attempt=1, verbose=False, force_use_threads=False, use_conda=False, use_singularity=False, singularity_args="", conda_prefix=None, list_conda_envs=False, singularity_prefix=None, shadow_prefix=None, create_envs_only=False, mode=Mode.default, wrapper_prefix=None, kubernetes=None, kubernetes_envvars=None, container_image=None, default_remote_provider=None, default_remote_prefix="", assume_shared_fs=True, cluster_status=None, export_cwl=None): """Run snakemake on a given snakefile. This function provides access to the whole snakemake functionality. It is not thread-safe. Args: snakefile (str): the path to the snakefile report (str): create an HTML report for a previous run at the given path listrules (bool): list rules (default False) list_target_rules (bool): list target rules (default False) cores (int): the number of provided cores (ignored when using cluster support) (default 1) nodes (int): the number of provided cluster nodes (ignored without cluster support) (default 1) local_cores (int): the number of provided local cores if in cluster mode (ignored without cluster support) (default 1) resources (dict): provided resources, a dictionary assigning integers to resource names, e.g. {gpu=1, io=5} (default {}) config (dict): override values for workflow config workdir (str): path to working directory (default None) targets (list): list of targets, e.g. rule or file names (default None) dryrun (bool): only dry-run the workflow (default False) touch (bool): only touch all output files if present (default False) forcetargets (bool): force given targets to be re-created (default False) forceall (bool): force all output files to be re-created (default False) forcerun (list): list of files and rules that shall be re-created/re-executed (default []) prioritytargets (list): list of targets that shall be run with maximum priority (default []) stats (str): path to file that shall contain stats about the workflow execution (default None) printreason (bool): print the reason for the execution of each job (default false) printshellcmds (bool): print the shell command of each job (default False) printdag (bool): print the dag in the graphviz dot language (default False) printrulegraph (bool): print the graph of rules in the graphviz dot language (default False) printd3dag (bool): print a D3.js compatible JSON representation of the DAG (default False) nocolor (bool): do not print colored output (default False) quiet (bool): do not print any default job information (default False) keepgoing (bool): keep goind upon errors (default False) cluster (str): submission command of a cluster or batch system to use, e.g. qsub (default None) cluster_config (str,list): configuration file for cluster options, or list thereof (default None) cluster_sync (str): blocking cluster submission command (like SGE 'qsub -sync y') (default None) drmaa (str): if not None use DRMAA for cluster support, str specifies native args passed to the cluster when submitting a job drmaa_log_dir (str): the path to stdout and stderr output of DRMAA jobs (default None) jobname (str): naming scheme for cluster job scripts (default "snakejob.{rulename}.{jobid}.sh") immediate_submit (bool): immediately submit all cluster jobs, regardless of dependencies (default False) standalone (bool): kill all processes very rudely in case of failure (do not use this if you use this API) (default False) (deprecated) ignore_ambiguity (bool): ignore ambiguous rules and always take the first possible one (default False) snakemakepath (str): deprecated parameter whose value is ignored. Do not use. lock (bool): lock the working directory when executing the workflow (default True) unlock (bool): just unlock the working directory (default False) cleanup_metadata (list): just cleanup metadata of given list of output files (default None) cleanup_conda (bool): just cleanup unused conda environments (default False) cleanup_shadow (bool): just cleanup old shadow directories (default False) force_incomplete (bool): force the re-creation of incomplete files (default False) ignore_incomplete (bool): ignore incomplete files (default False) list_version_changes (bool): list output files with changed rule version (default False) list_code_changes (bool): list output files with changed rule code (default False) list_input_changes (bool): list output files with changed input files (default False) list_params_changes (bool): list output files with changed params (default False) list_untracked (bool): list files in the workdir that are not used in the workflow (default False) summary (bool): list summary of all output files and their status (default False) archive (str): archive workflow into the given tarball delete_all_output (bool) remove all files generated by the workflow (default False) delete_temp_output (bool) remove all temporary files generated by the workflow (default False) latency_wait (int): how many seconds to wait for an output file to appear after the execution of a job, e.g. to handle filesystem latency (default 3) wait_for_files (list): wait for given files to be present before executing the workflow list_resources (bool): list resources used in the workflow (default False) summary (bool): list summary of all output files and their status (default False). If no option is specified a basic summary will be ouput. If 'detailed' is added as an option e.g --summary detailed, extra info about the input and shell commands will be included detailed_summary (bool): list summary of all input and output files and their status (default False) print_compilation (bool): print the compilation of the snakefile (default False) debug (bool): allow to use the debugger within rules notemp (bool): ignore temp file flags, e.g. do not delete output files marked as temp after use (default False) keep_remote_local (bool): keep local copies of remote files (default False) nodeps (bool): ignore dependencies (default False) keep_target_files (bool): do not adjust the paths of given target files relative to the working directory. allowed_rules (set): restrict allowed rules to the given set. If None or empty, all rules are used. jobscript (str): path to a custom shell script template for cluster jobs (default None) greediness (float): set the greediness of scheduling. This value between 0 and 1 determines how careful jobs are selected for execution. The default value (0.5 if prioritytargets are used, 1.0 else) provides the best speed and still acceptable scheduling quality. overwrite_shellcmd (str): a shell command that shall be executed instead of those given in the workflow. This is for debugging purposes only. updated_files(list): a list that will be filled with the files that are updated or created during the workflow execution verbose (bool): show additional debug output (default False) max_jobs_per_second (int): maximal number of cluster/drmaa jobs per second, None to impose no limit (default None) restart_times (int): number of times to restart failing jobs (default 0) attempt (int): initial value of Job.attempt. This is intended for internal use only (default 1). force_use_threads: whether to force use of threads over processes. helpful if shared memory is full or unavailable (default False) use_conda (bool): create conda environments for each job (defined with conda directive of rules) use_singularity (bool): run jobs in singularity containers (if defined with singularity directive) singularity_args (str): additional arguments to pass to singularity conda_prefix (str): the directory in which conda environments will be created (default None) singularity_prefix (str): the directory to which singularity images will be pulled (default None) shadow_prefix (str): prefix for shadow directories. The job-specific shadow directories will be created in $SHADOW_PREFIX/shadow/ (default None) create_envs_only (bool): if specified, only builds the conda environments specified for each job, then exits. list_conda_envs (bool): list conda environments and their location on disk. mode (snakemake.common.Mode): execution mode wrapper_prefix (str): prefix for wrapper script URLs (default None) kubernetes (str): submit jobs to kubernetes, using the given namespace. kubernetes_env (list): environment variables that shall be passed to kubernetes jobs. container_image (str): Docker image to use, e.g., for kubernetes. default_remote_provider (str): default remote provider to use instead of local files (e.g. S3, GS) default_remote_prefix (str): prefix for default remote provider (e.g. name of the bucket). assume_shared_fs (bool): assume that cluster nodes share a common filesystem (default true). cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as single argument. export_cwl (str): Compile workflow to CWL and save to given file log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries: :level: the log level ("info", "error", "debug", "progress", "job_info") :level="info", "error" or "debug": :msg: the log message :level="progress": :done: number of already executed jobs :total: number of total jobs :level="job_info": :input: list of input files of a job :output: list of output files of a job :log: path to log file of a job :local: whether a job is executed locally (i.e. ignoring cluster) :msg: the job message :reason: the job reason :priority: the job priority :threads: the threads of the job Returns: bool: True if workflow execution was successful. """ assert not immediate_submit or (immediate_submit and notemp), "immediate_submit has to be combined with notemp (it does not support temp file handling)" if updated_files is None: updated_files = list() if cluster or cluster_sync or drmaa: cores = sys.maxsize else: nodes = sys.maxsize if isinstance(cluster_config, str): # Loading configuration from one file is still supported for # backward compatibility cluster_config = [cluster_config] if cluster_config: # Load all configuration files configs = [load_configfile(f) for f in cluster_config] # Merge in the order as specified, overriding earlier values with # later ones cluster_config_content = configs[0] for other in configs[1:]: update_config(cluster_config_content, other) else: cluster_config_content = dict() run_local = not (cluster or cluster_sync or drmaa or kubernetes) if run_local and not dryrun: # clean up all previously recorded jobids. shell.cleanup() # force thread use for any kind of cluster use_threads = force_use_threads or (os.name != "posix") or cluster or cluster_sync or drmaa if not keep_logger: stdout = ( (dryrun and not (printdag or printd3dag or printrulegraph)) or listrules or list_target_rules or list_resources ) setup_logger(handler=log_handler, quiet=quiet, printreason=printreason, printshellcmds=printshellcmds, debug_dag=debug_dag, nocolor=nocolor, stdout=stdout, debug=verbose, use_threads=use_threads, mode=mode) if greediness is None: greediness = 0.5 if prioritytargets else 1.0 else: if not (0 <= greediness <= 1.0): logger.error("Error: greediness must be a float between 0 and 1.") return False if not os.path.exists(snakefile): logger.error("Error: Snakefile \"{}\" not present.".format(snakefile)) return False snakefile = os.path.abspath(snakefile) cluster_mode = (cluster is not None) + (cluster_sync is not None) + (drmaa is not None) if cluster_mode > 1: logger.error("Error: cluster and drmaa args are mutually exclusive") return False if debug and (cores > 1 or cluster_mode): logger.error( "Error: debug mode cannot be used with more than one core or cluster execution.") return False overwrite_config = dict() if configfile: overwrite_config.update(load_configfile(configfile)) configfile = os.path.abspath(configfile) if config: overwrite_config.update(config) if config_args is None: config_args = unparse_config(config) if workdir: olddir = os.getcwd() if not os.path.exists(workdir): logger.info( "Creating specified working directory {}.".format(workdir)) os.makedirs(workdir) workdir = os.path.abspath(workdir) os.chdir(workdir) logger.setup_logfile() try: # handle default remote provider _default_remote_provider = None if default_remote_provider is not None: try: rmt = importlib.import_module("snakemake.remote." + default_remote_provider) except ImportError as e: raise WorkflowError("Unknown default remote provider.") if rmt.RemoteProvider.supports_default: _default_remote_provider = rmt.RemoteProvider() else: raise WorkflowError("Remote provider {} does not (yet) support to " "be used as default provider.") workflow = Workflow(snakefile=snakefile, jobscript=jobscript, overwrite_shellcmd=overwrite_shellcmd, overwrite_config=overwrite_config, overwrite_workdir=workdir, overwrite_configfile=configfile, overwrite_clusterconfig=cluster_config_content, config_args=config_args, debug=debug, use_conda=use_conda or list_conda_envs or cleanup_conda, use_singularity=use_singularity, conda_prefix=conda_prefix, singularity_prefix=singularity_prefix, shadow_prefix=shadow_prefix, singularity_args=singularity_args, mode=mode, wrapper_prefix=wrapper_prefix, printshellcmds=printshellcmds, restart_times=restart_times, attempt=attempt, default_remote_provider=_default_remote_provider, default_remote_prefix=default_remote_prefix, run_local=run_local) success = True workflow.include(snakefile, overwrite_first_rule=True, print_compilation=print_compilation) workflow.check() if not print_compilation: if listrules: workflow.list_rules() elif list_target_rules: workflow.list_rules(only_targets=True) elif list_resources: workflow.list_resources() else: # if not printdag and not printrulegraph: # handle subworkflows subsnakemake = partial(snakemake, cores=cores, nodes=nodes, local_cores=local_cores, resources=resources, dryrun=dryrun, touch=touch, printreason=printreason, printshellcmds=printshellcmds, debug_dag=debug_dag, nocolor=nocolor, quiet=quiet, keepgoing=keepgoing, cluster=cluster, cluster_sync=cluster_sync, drmaa=drmaa, drmaa_log_dir=drmaa_log_dir, jobname=jobname, immediate_submit=immediate_submit, standalone=standalone, ignore_ambiguity=ignore_ambiguity, restart_times=restart_times, attempt=attempt, lock=lock, unlock=unlock, cleanup_metadata=cleanup_metadata, cleanup_conda=cleanup_conda, cleanup_shadow=cleanup_shadow, force_incomplete=force_incomplete, ignore_incomplete=ignore_incomplete, latency_wait=latency_wait, verbose=verbose, notemp=notemp, keep_remote_local=keep_remote_local, nodeps=nodeps, jobscript=jobscript, greediness=greediness, no_hooks=no_hooks, overwrite_shellcmd=overwrite_shellcmd, config=config, config_args=config_args, cluster_config=cluster_config, keep_logger=True, force_use_threads=use_threads, use_conda=use_conda, use_singularity=use_singularity, conda_prefix=conda_prefix, singularity_prefix=singularity_prefix, shadow_prefix=shadow_prefix, singularity_args=singularity_args, list_conda_envs=list_conda_envs, kubernetes=kubernetes, kubernetes_envvars=kubernetes_envvars, container_image=container_image, create_envs_only=create_envs_only, default_remote_provider=default_remote_provider, default_remote_prefix=default_remote_prefix, assume_shared_fs=assume_shared_fs, cluster_status=cluster_status, max_jobs_per_second=max_jobs_per_second, max_status_checks_per_second=max_status_checks_per_second) success = workflow.execute( targets=targets, dryrun=dryrun, touch=touch, cores=cores, nodes=nodes, local_cores=local_cores, forcetargets=forcetargets, forceall=forceall, forcerun=forcerun, prioritytargets=prioritytargets, until=until, omit_from=omit_from, quiet=quiet, keepgoing=keepgoing, printshellcmds=printshellcmds, printreason=printreason, printrulegraph=printrulegraph, printdag=printdag, cluster=cluster, cluster_sync=cluster_sync, jobname=jobname, drmaa=drmaa, drmaa_log_dir=drmaa_log_dir, kubernetes=kubernetes, kubernetes_envvars=kubernetes_envvars, container_image=container_image, max_jobs_per_second=max_jobs_per_second, max_status_checks_per_second=max_status_checks_per_second, printd3dag=printd3dag, immediate_submit=immediate_submit, ignore_ambiguity=ignore_ambiguity, stats=stats, force_incomplete=force_incomplete, ignore_incomplete=ignore_incomplete, list_version_changes=list_version_changes, list_code_changes=list_code_changes, list_input_changes=list_input_changes, list_params_changes=list_params_changes, list_untracked=list_untracked, list_conda_envs=list_conda_envs, summary=summary, archive=archive, delete_all_output=delete_all_output, delete_temp_output=delete_temp_output, latency_wait=latency_wait, wait_for_files=wait_for_files, detailed_summary=detailed_summary, nolock=not lock, unlock=unlock, resources=resources, notemp=notemp, keep_remote_local=keep_remote_local, nodeps=nodeps, keep_target_files=keep_target_files, cleanup_metadata=cleanup_metadata, cleanup_conda=cleanup_conda, cleanup_shadow=cleanup_shadow, subsnakemake=subsnakemake, updated_files=updated_files, allowed_rules=allowed_rules, greediness=greediness, no_hooks=no_hooks, force_use_threads=use_threads, create_envs_only=create_envs_only, assume_shared_fs=assume_shared_fs, cluster_status=cluster_status, report=report, export_cwl=export_cwl) except BrokenPipeError: # ignore this exception and stop. It occurs if snakemake output is piped into less and less quits before reading the whole output. # in such a case, snakemake shall stop scheduling and quit with error 1 success = False except (Exception, BaseException) as ex: if "workflow" in locals(): print_exception(ex, workflow.linemaps) else: print_exception(ex, dict()) success = False if workdir: os.chdir(olddir) if "workflow" in locals() and workflow.persistence: workflow.persistence.unlock() if not keep_logger: logger.cleanup() return success
def parse_resources(args): """Parse resources from args.""" resources = dict() if args.resources is not None: valid = re.compile("[a-zA-Z_]\w*$") for res in args.resources: try: res, val = res.split("=") except ValueError: raise ValueError( "Resources have to be defined as name=value pairs.") if not valid.match(res): raise ValueError( "Resource definition must start with a valid identifier.") try: val = int(val) except ValueError: raise ValueError( "Resource definiton must contain an integer after the identifier.") if res == "_cores": raise ValueError( "Resource _cores is already defined internally. Use a different name.") resources[res] = val return resources def parse_config(args): """Parse config from args.""" parsers = [int, float, eval, str] config = dict() if args.config is not None: valid = re.compile("[a-zA-Z_]\w*$") for entry in args.config: try: key, val = entry.split("=", 1) except ValueError: raise ValueError( "Config entries have to be defined as name=value pairs.") if not valid.match(key): raise ValueError( "Config entry must start with a valid identifier.") v = None for parser in parsers: try: v = parser(val) # avoid accidental interpretation as function if not callable(v): break except: pass assert v is not None config[key] = v return config def unparse_config(config): if not isinstance(config, dict): raise ValueError("config is not a dict") items = [] for key, value in config.items(): if isinstance(value, dict): raise ValueError("config may only be a flat dict") encoded = "'{}'".format(value) if isinstance(value, str) else value items.append("{}={}".format(key, encoded)) return items APPDIRS = None def get_appdirs(): global APPDIRS if APPDIRS is None: from appdirs import AppDirs APPDIRS = AppDirs("snakemake", "snakemake") return APPDIRS def get_profile_file(profile, file, return_default=False): dirs = get_appdirs() if os.path.isabs(profile): search_dirs = [os.path.dirname(profile)] profile = os.path.basename(profile) else: search_dirs = [os.getcwd(), dirs.user_config_dir, dirs.site_config_dir] get_path = lambda d: os.path.join(d, profile, file) for d in search_dirs: p = get_path(d) if os.path.exists(p): return p if return_default: return file return None def get_argument_parser(profile=None): """Generate and return argument parser.""" import configargparse from configargparse import YAMLConfigFileParser dirs = get_appdirs() config_files = [] if profile: if profile == "": print("Error: invalid profile name.", file=sys.stderr) exit(1) config_file = get_profile_file(profile, "config.yaml") if config_file is None: print("Error: profile given but no config.yaml found. " "Profile has to be given as either absolute path, relative " "path or name of a directory available in either " "{site} or {user}.".format( site=dirs.site_config_dir, user=dirs.user_config_dir), file=sys.stderr) exit(1) config_files = [config_file] parser = configargparse.ArgumentParser( description="Snakemake is a Python based language and execution " "environment for GNU Make-like workflows.", default_config_files=config_files, config_file_parser_class=YAMLConfigFileParser) group_exec = parser.add_argument_group("EXECUTION") group_exec.add_argument("target", nargs="*", default=None, help="Targets to build. May be rules or files.") group_exec.add_argument("--dryrun", "-n", action="store_true", help="Do not execute anything, and display what would be done. " "If you have a very large workflow, use --dryrun --quiet to just " "print a summary of the DAG of jobs.") group_exec.add_argument("--profile", help=""" Name of profile to use for configuring Snakemake. Snakemake will search for a corresponding folder in {} and {}. Alternatively, this can be an absolute or relative path. The profile folder has to contain a file 'config.yaml'. This file can be used to set default values for command line options in YAML format. For example, '--cluster qsub' becomes 'cluster: qsub' in the YAML file. Profiles can be obtained from