Source code for flow.environment

# Copyright (c) 2018 The Regents of the University of Michigan
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Detection of compute environments.

This module provides the :class:`ComputeEnvironment` class, which can be
subclassed to automatically detect specific computational environments.

This enables the user to adjust their workflow based on the present
environment, e.g. for the adjustment of scheduler submission scripts.
"""
import importlib
import logging
import os
import re
import socket
import warnings
from functools import lru_cache

from signac.common import config

from .directives import (
    _FORK,
    _GET_EXECUTABLE,
    _MEMORY,
    _NGPU,
    _NP,
    _NRANKS,
    _OMP_NUM_THREADS,
    _PROCESSOR_FRACTION,
    _WALLTIME,
    _Directives,
)
from .errors import NoSchedulerError
from .scheduling.base import JobStatus
from .scheduling.fake_scheduler import FakeScheduler
from .scheduling.lsf import LSFScheduler
from .scheduling.pbs import PBSScheduler
from .scheduling.simple_scheduler import SimpleScheduler
from .scheduling.slurm import SlurmScheduler
from .util import config as flow_config

logger = logging.getLogger(__name__)


@lru_cache(maxsize=1)
def _cached_fqdn():
    """Return the fully qualified domain name.

    This value is cached because fetching the fully qualified domain name can
    be slow on macOS.
    """
    return socket.getfqdn()


[docs]def setup(py_modules, **attrs): """Set up user-defined environment modules. Use this function in place of :meth:`setuptools.setup` to not only install an environment's module, but also register it with the global signac configuration. Once registered, the environment is automatically imported when the :meth:`~flow.get_environment` function is called. """ warnings.warn( "The configuration key flow.environment_modules will be removed in signac-flow version " "0.21. Users should manually import user-defined environments instead.", FutureWarning, ) import setuptools from setuptools.command.install import install class InstallAndConfig(install): def run(self): super().run() cfg = config.read_config_file(config.FN_CONFIG) try: envs = cfg["flow"].as_list("environment_modules") except KeyError: envs = [] new = set(py_modules).difference(envs) if new: for name in new: self.announce( msg=f"registering module '{name}' in global signac configuration", level=2, ) cfg.setdefault("flow", {}) cfg["flow"]["environment_modules"] = envs + list(new) cfg.write() return setuptools.setup( py_modules=py_modules, cmdclass={"install": InstallAndConfig}, **attrs )
class _ComputeEnvironmentType(type): """Metaclass used for :class:`~.ComputeEnvironment`. This metaclass automatically registers :class:`~.ComputeEnvironment` definitions, which enables the automatic determination of the present environment. The registry can be obtained from :func:`~.registered_environments`. """ def __init__(cls, name, bases, dct): if not hasattr(cls, "registry"): cls.registry = {} else: cls.registry[name] = cls super().__init__(name, bases, dct)
[docs]def template_filter(func): """Decorate a function as a :class:`~.ComputeEnvironment` template filter. This decorator is applied to methods defined in a subclass of :class:`~.ComputeEnvironment` that are used in that environment's templates. The decorated function becomes a class method that is available as a :ref:`jinja2 filter <jinja2:filters>` in templates rendered by a :class:`~.FlowProject` with that :class:`~.ComputeEnvironment`. Parameters ---------- func : callable Function to decorate. Returns ------- callable Decorated function. """ setattr(func, "_flow_template_filter", True) return classmethod(func)
[docs]class ComputeEnvironment(metaclass=_ComputeEnvironmentType): """Define computational environments. The ComputeEnvironment class allows us to automatically determine specific environments in order to programmatically adjust workflows in different environments. The default method for the detection of a specific environment is to provide a regular expression matching the environment's hostname. For example, if the hostname is ``my-server.com``, one could identify the environment by setting the ``hostname_pattern`` to ``'my-server'``. """ scheduler_type = None hostname_pattern = None submit_flags = None template = "base_script.sh" mpi_cmd = "mpiexec"
[docs] @classmethod def is_present(cls): """Determine whether this specific compute environment is present. The default method for environment detection is trying to match a hostname pattern or delegate the environment check to the associated scheduler type. """ if cls.hostname_pattern is None: if cls.scheduler_type is None: return False return cls.scheduler_type.is_present() return re.match(cls.hostname_pattern, _cached_fqdn()) is not None
[docs] @classmethod def get_scheduler(cls): """Return an environment-specific scheduler driver. The returned scheduler class provides a standardized interface to different scheduler implementations. """ try: return getattr(cls, "scheduler_type")() except (AttributeError, TypeError): raise NoSchedulerError( f"No scheduler defined for environment '{cls.__name__}'." )
[docs] @classmethod def submit(cls, script, flags=None, *args, **kwargs): r"""Submit a job submission script to the environment's scheduler. Scripts should be submitted to the environment, instead of directly to the scheduler to allow for environment specific post-processing. Parameters ---------- script : str The script to submit. flags : list A list of additional flags to provide to the scheduler. (Default value = None) \*args Positional arguments forwarded to the scheduler's submit method. \*\*kwargs Keyword arguments forwarded to the scheduler's submit method. Returns ------- JobStatus.submitted or None Status of job, if submitted. """ if flags is None: flags = [] env_flags = getattr(cls, "submit_flags", []) if env_flags: flags.extend(env_flags) # parse the flag to check for --job-name for flag in flags: if "--job-name" in flag: raise ValueError('Assignment of "--job-name" is not supported.') # Hand off the actual submission to the scheduler if cls.get_scheduler().submit(script, flags=flags, *args, **kwargs): return JobStatus.submitted return None
[docs] @classmethod def add_args(cls, parser): """Add arguments related to this compute environment to an argument parser. Parameters ---------- parser : :class:`argparse.ArgumentParser` The argument parser where arguments will be added. """ pass
[docs] @classmethod def get_config_value(cls, key, default=flow_config._GET_CONFIG_VALUE_NONE): """Request a value from the user's configuration. This method should be used whenever values need to be provided that are specific to a user's environment, e.g. account names. When a key is not configured and no default value is provided, a :class:`~flow.errors.SubmitError` will be raised and the user will be prompted to add the missing key to their configuration. Please note, that the key will be automatically expanded to be specific to this environment definition. For example, a key should be ``'account'``, not ``'MyEnvironment.account'``. Parameters ---------- key : str The environment specific configuration key. default : str A default value in case the key cannot be found within the user's configuration. Returns ------- object The value or default value. Raises ------ :class:`~flow.errors.SubmitError` If the key is not in the user's configuration and no default value is provided. """ return flow_config.require_config_value(key, ns=cls.__name__, default=default)
@classmethod def _get_omp_prefix(cls, operation): """Get the OpenMP prefix based on the ``omp_num_threads`` directive. Parameters ---------- operation : :class:`flow.project._JobOperation` The operation to be prefixed. Returns ------- str The prefix to be added to the operation's command. """ return "export OMP_NUM_THREADS={}; ".format( operation.directives["omp_num_threads"] ) @classmethod def _get_mpi_prefix(cls, operation, parallel): """Get the MPI prefix based on the ``nranks`` directives. Parameters ---------- operation : :class:`flow.project._JobOperation` The operation to be prefixed. parallel : bool If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the maximum number of tasks. Default is set to False. Returns ------- str The prefix to be added to the operation's command. """ if operation.directives.get("nranks"): return "{} -n {} ".format(cls.mpi_cmd, operation.directives["nranks"]) return ""
[docs] @template_filter def get_prefix(cls, operation, parallel=False, mpi_prefix=None, cmd_prefix=None): """Template filter generating a command prefix from directives. Parameters ---------- operation : :class:`flow.project._JobOperation` The operation to be prefixed. parallel : bool If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the maximum number of tasks. Default is set to False. mpi_prefix : str User defined mpi_prefix string. Default is set to None. This will be deprecated and removed in the future. cmd_prefix : str User defined cmd_prefix string. Default is set to None. This will be deprecated and removed in the future. Returns ------- str The prefix to be added to the operation's command. """ prefix = "" if operation.directives.get("omp_num_threads"): prefix += cls._get_omp_prefix(operation) if mpi_prefix: prefix += mpi_prefix else: prefix += cls._get_mpi_prefix(operation, parallel) if cmd_prefix: prefix += cmd_prefix # if cmd_prefix and if mpi_prefix for backwards compatibility # Can change to get them from directives for future return prefix
@classmethod def _get_default_directives(cls): return _Directives( ( _GET_EXECUTABLE(), _FORK, _MEMORY, _NGPU, _NP, _NRANKS, _OMP_NUM_THREADS, _PROCESSOR_FRACTION, _WALLTIME, ) )
[docs]class StandardEnvironment(ComputeEnvironment): """Default environment which is always present."""
[docs] @classmethod def is_present(cls): """Determine whether this specific compute environment is present. The StandardEnvironment is always present, so this returns True. """ return True
[docs]class TestEnvironment(ComputeEnvironment): """Environment used for testing. The test environment will print a mocked submission script and submission commands to screen. This enables testing of the job submission script generation in environments without a real scheduler. """ scheduler_type = FakeScheduler
[docs]class SimpleSchedulerEnvironment(ComputeEnvironment): """An environment for the simple-scheduler scheduler.""" scheduler_type = SimpleScheduler template = "simple_scheduler.sh"
[docs]class DefaultPBSEnvironment(ComputeEnvironment): """Default environment for clusters with a PBS scheduler.""" scheduler_type = PBSScheduler template = "pbs.sh" @classmethod def add_args(cls, parser): """Add arguments to the parser. Parameters ---------- parser : :class:`argparse.ArgumentParser` The argument parser where arguments will be added. """ super().add_args(parser) parser.add_argument( "--hold", action="store_true", help="Submit jobs, but put them on hold." ) parser.add_argument( "--after", type=str, help="Schedule this job to be executed after " "completion of a cluster job with this id.", ) parser.add_argument( "--job-output", type=str, help=( "What to name the job output file. " "If omitted, uses the scheduler default name. " "Both stdout and stderr will be combined." ), ) parser.add_argument( "--no-copy-env", action="store_true", help="Do not copy current environment variables into compute node environment.", )
[docs]class DefaultSlurmEnvironment(ComputeEnvironment): """Default environment for clusters with a SLURM scheduler.""" scheduler_type = SlurmScheduler template = "slurm.sh" @classmethod def add_args(cls, parser): """Add arguments to the parser. Parameters ---------- parser : :class:`argparse.ArgumentParser` The argument parser where arguments will be added. """ super().add_args(parser) parser.add_argument( "--hold", action="store_true", help="Submit jobs, but put them on hold." ) parser.add_argument( "--after", type=str, help="Schedule this job to be executed after " "completion of a cluster job with this id.", ) parser.add_argument( "--job-output", type=str, help=( "What to name the job output file. " "If omitted, uses the scheduler default name. " "Both stdout and stderr will be combined." ), )
[docs]class DefaultLSFEnvironment(ComputeEnvironment): """Default environment for clusters with a LSF scheduler.""" scheduler_type = LSFScheduler template = "lsf.sh" @classmethod def add_args(cls, parser): """Add arguments to the parser. Parameters ---------- parser : :class:`argparse.ArgumentParser` The argument parser where arguments will be added. """ super().add_args(parser) parser.add_argument( "--hold", action="store_true", help="Submit jobs, but put them on hold." ) parser.add_argument( "--after", type=str, help="Schedule this job to be executed after " "completion of a cluster job with this id.", ) parser.add_argument( "--job-output", type=str, help=( "What to name the job output file. " "If omitted, uses the scheduler default name. " "Both stdout and stderr will be combined." ), )
def _import_configured_environments(): cfg = config.load_config(config.FN_CONFIG) try: for name in cfg["flow"].as_list("environment_modules"): try: importlib.import_module(name) except ImportError as error: logger.warning(error) except KeyError: pass else: warnings.warn( "The configuration key flow.environment_modules will be removed in signac-flow version " "0.21. Users should manually import user-defined environments instead.", FutureWarning, )
[docs]def registered_environments(import_configured=True): """Return a list of registered environments. Parameters ---------- import_configured : bool Whether to import environments specified in the flow configuration. (Default value = True) Returns ------- list List of registered environments. """ if import_configured: _import_configured_environments() return list(ComputeEnvironment.registry.values())
[docs]def get_environment(test=False, import_configured=True): """Attempt to detect the present environment. This function iterates through all defined :class:`~.ComputeEnvironment` classes in reversed order of definition and returns the first environment where the :meth:`~.ComputeEnvironment.is_present` method returns True. Parameters ---------- test : bool Whether to return the TestEnvironment. (Default value = False) import_configured : bool Whether to import environments specified in the flow configuration. (Default value = True) Returns ------- :class:`~.ComputeEnvironment` The detected environment class. """ if test: return TestEnvironment # Obtain a list of all registered environments env_types = registered_environments(import_configured=import_configured) logger.debug( "List of registered environments:\n\t{}".format( "\n\t".join(str(env.__name__) for env in env_types) ) ) # Select environment based on environment variable if set. env_from_env_var = os.environ.get("SIGNAC_FLOW_ENVIRONMENT") if env_from_env_var: for env_type in env_types: if env_type.__name__ == env_from_env_var: return env_type raise ValueError(f"Unknown environment '{env_from_env_var}'.") # Select based on DEBUG flag: for env_type in env_types: if getattr(env_type, "DEBUG", False): logger.debug(f"Select environment '{env_type.__name__}'; DEBUG=True.") return env_type # Default selection: for env_type in reversed(env_types): if env_type.is_present(): logger.debug(f"Select environment '{env_type.__name__}'; is present.") return env_type # Otherwise, just return a standard environment return StandardEnvironment