Source code for flow.project

# Copyright (c) 2019 The Regents of the University of Michigan
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Workflow definition with the FlowProject.

The FlowProject is a signac Project that allows the user to define a workflow.
"""
import argparse
import contextlib
import functools
import inspect
import json
import logging
import multiprocessing
import os
import random
import re
import shlex
import subprocess
import sys
import textwrap
import threading
import time
import warnings
from collections import Counter, defaultdict
from copy import deepcopy
from enum import IntFlag
from hashlib import md5, sha1
from itertools import chain, count, groupby, islice
from multiprocessing import Event, Pool, TimeoutError, cpu_count
from multiprocessing.pool import ThreadPool

import cloudpickle
import jinja2
import jsonschema
import signac
from jinja2 import TemplateNotFound as Jinja2TemplateNotFound
from signac.filterparse import parse_filter_arg

from .aggregates import (
    _AggregatesCursor,
    _AggregateStore,
    _AggregateStoresCursor,
    _JobAggregateCursor,
    aggregator,
    get_aggregate_id,
)
from .directives import _document_directive
from .environment import ComputeEnvironment, get_environment, registered_environments
from .errors import (
    ConfigKeyError,
    FlowProjectDefinitionError,
    NoSchedulerError,
    SubmitError,
    TemplateError,
    UserConditionError,
    UserOperationError,
)
from .hooks import _Hooks
from .labels import _is_label_func, classlabel, label, staticlabel
from .render_status import _render_status
from .scheduling.base import ClusterJob, JobStatus
from .util import config as flow_config
from .util import template_filters
from .util.misc import (
    _add_cwd_to_environment_pythonpath,
    _bidict,
    _cached_partial,
    _get_parallel_executor,
    _positive_int,
    _roundrobin,
    _switch_to_directory,
    _to_hashable,
    _TrackGetItemDict,
    tqdm,
)
from .util.translate import abbreviate, shorten

logger = logging.getLogger(__name__)


# The TEMPLATE_HELP can be shown with the --template-help option available to all
# command line subcommands that use the templating system.
TEMPLATE_HELP = """Execution and submission scripts are generated with the jinja2 template files.
Standard files are shipped with the package, but maybe replaced or extended with
custom templates provided within a project.
The default template directory can be configured with the 'template_dir' configuration
variable, for example in the project configuration file. The current template directory is:
{template_dir}

All template variables can be placed within a template using the standard jinja2
syntax, e.g., the project root directory can be written as: {{{{ project.path }}}}.
The available template variables are:
{template_vars}

Filter functions can be used to format template variables in a specific way.
For example: {{{{ project.id | capitalize }}}}.

The available filters are:
{filters}"""

_FMT_SCHEDULER_STATUS = {
    JobStatus.unknown: "U",
    JobStatus.registered: "R",
    JobStatus.inactive: "I",
    JobStatus.submitted: "S",
    JobStatus.held: "H",
    JobStatus.queued: "Q",
    JobStatus.active: "A",
    JobStatus.error: "E",
    JobStatus.group_registered: "GR",
    JobStatus.group_inactive: "GI",
    JobStatus.group_submitted: "GS",
    JobStatus.group_held: "GH",
    JobStatus.group_queued: "GQ",
    JobStatus.group_active: "GA",
    JobStatus.group_error: "GE",
    JobStatus.placeholder: " ",
}


[docs] class IgnoreConditions(IntFlag): """Flags that determine which conditions are used to determine job eligibility.""" # The __invert__ operator must be defined since IntFlag simply performs an # integer bitwise not on the underlying enum value, which is problematic in # two's-complement arithmetic. What we want is to only flip valid bits. def __invert__(self): # Compute the largest number of bits used to represent one of the flags # so that we can XOR the appropriate number. max_bits = len(bin(max(elem.value for elem in type(self)))) - 2 return self.__class__((2**max_bits - 1) ^ self._value_) NONE = 0 """Check all conditions.""" PRE = 1 """Ignore preconditions.""" POST = 2 """Ignore postconditions.""" ALL = PRE | POST """Ignore all conditions.""" def __str__(self): return { IgnoreConditions.PRE: "pre", IgnoreConditions.POST: "post", IgnoreConditions.ALL: "all", IgnoreConditions.NONE: "none", }[self]
class _IgnoreConditionsConversion(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): if nargs is not None: raise ValueError("nargs not allowed") super().__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, getattr(IgnoreConditions, values.upper())) class _condition: # This counter should be incremented each time a "never" condition # is created, and the value should be used as the tag for that # condition to ensure that no pair of "never" conditions # are found to be equal by the graph detection algorithm. current_arbitrary_tag = 0 def __init__(self, condition, tag=None): """Add tag to differentiate built-in conditions during graph detection.""" if tag is None: try: tag = condition.__code__.co_code except AttributeError: logger.warning(f"Condition {condition} could not autogenerate tag.") condition._flow_tag = tag self.condition = condition @classmethod def isfile(cls, filename): """Determine if the specified file exists for the job(s).""" def _isfile(*jobs): return all(job.isfile(filename) for job in jobs) return cls(_isfile, "isfile_" + filename) @classmethod def true(cls, key): """Evaluate if a document key is True for the job(s). Returns True if the specified key is present in the job document(s) and evaluates to True. """ def _document(*jobs): return all(job.document.get(key, False) for job in jobs) return cls(_document, "true_" + key) @classmethod def false(cls, key): """Evaluate if a document key is False for the job(s). Returns True if the specified key is present in the job document(s) and evaluates to False. """ def _no_document(*jobs): return all(not job.document.get(key, False) for job in jobs) return cls(_no_document, "false_" + key) @classmethod def never(cls, func): """Return False.""" cls.current_arbitrary_tag += 1 return cls(lambda _: False, str(cls.current_arbitrary_tag))(func) @classmethod def not_(cls, condition): """Return ``not condition(*jobs)`` for the provided condition function.""" def _not(*jobs): return not condition(*jobs) return cls(_not, b"not_" + condition.__code__.co_code) def _create_all_metacondition(condition_dict, *other_funcs): """Generate metacondition requiring all provided conditions to be true. This function generates an aggregate metaconditions that requires *all* provided conditions to be met. The resulting metacondition is constructed with appropriate information for graph detection. """ condition_list = [c for f in other_funcs for c in condition_dict[f]] def _flow_metacondition(*jobs): return all(c(*jobs) for c in condition_list) _flow_metacondition._composed_of = condition_list return _flow_metacondition def _make_bundles(operations, size=None): """Slice an iterable of operations into equally sized bundles. This utility function splits an iterable of operations into equally sized bundles. The final bundle may be smaller than the specified size. Parameters ---------- operations : iterable Iterable of operations. size : int Size of bundles. (Default value = None) Yields ------ list Bundles of operations with specified size. """ if size == 0: size = None operations = iter(operations) while True: bundle = list(islice(operations, size)) if bundle: yield bundle else: break class _JobOperation: """Class containing execution information for one group and one job. The execution or submission of a :class:`~.FlowGroup` uses a passed-in command which can either be a string or function with no arguments that returns a shell executable command. The shell executable command won't be used if it is determined that the group can be executed without forking. .. note:: This class is used by the :class:`~.FlowGroup` class for the execution and submission process and should not be instantiated by users themselves. Parameters ---------- id : str The id of this _JobOperation instance. The id should be unique. name : str The name of the _JobOperation. jobs : tuple of :class:`~signac.job.Job` The jobs associated with this operation. cmd : callable or str The command that executes this operation. Can be a callable that when evaluated returns a string. directives : dict A `dict` object of additional parameters that provide instructions on how to execute this operation, e.g., specifically required resources. user_directives : set Keys in ``directives`` that correspond to user-specified directives that are not part of the environment's standard directives. """ def __init__(self, id, name, jobs, cmd, directives, user_directives): self._id = id self.name = name self._jobs = jobs if not (callable(cmd) or isinstance(cmd, str)): raise ValueError("cmd must be a callable or string.") self._cmd = cmd # We use a special dictionary that tracks all keys that have been # evaluated by the template engine and compare them to those explicitly # set by the user. See also comment below. self.directives = _TrackGetItemDict(directives) # Keys which were explicitly set by the user, but are not evaluated by # the template engine are cause for concern and might hint at a bug in # the template script or ill-defined directives. We are therefore # keeping track of all keys set by the user and check whether they have # been evaluated by the template script engine later. self.directives._keys_set_by_user = user_directives def __str__(self): aggregate_id = get_aggregate_id(self._jobs) return f"{self.name}({aggregate_id})" def __repr__(self): return "{type}(name='{name}', jobs='{jobs}', cmd={cmd}, directives={directives})".format( type=type(self).__name__, name=self.name, jobs="(" + ", ".join(map(repr, self._jobs)) + ")", cmd=repr(self.cmd), directives=self.directives, ) def __hash__(self): return hash(self.id) def __eq__(self, other): return self.id == other.id @property def id(self): return self._id @property def cmd(self): if callable(self._cmd): # We allow cmd to be 'lazy' or an unevaluated function because # in cases where a user uses the Python API without specifying # a project entrypoint, running many operations is still valid. # If we need to fork this will fail to generate a command and # error, but not until then. If we don't fork then nothing errors, # and the user gets the expected result. return self._cmd() return self._cmd class _SubmissionJobOperation(_JobOperation): r"""Class containing submission information for one group and one job. This class extends :class:`_JobOperation` to include a set of groups that will be executed via the "run" command. These groups are known at submission time. Parameters ---------- \*args Passed to the constructor of :class:`_JobOperation`. eligible_operations : list A list of :class:`_JobOperation` that will be executed when this submitted job is executed. operations_with_unmet_preconditions : list A list of :class:`_JobOperation` that will not be executed in the first pass of :meth:`FlowProject.run` due to unmet preconditions. These operations may be executed in subsequent iterations of the run loop. operations_with_met_postconditions : list A list of :class:`_JobOperation` that will not be executed in the first pass of :meth:`FlowProject.run` because all postconditions are met. These operations may be executed in subsequent iterations of the run loop. \*\*kwargs Passed to the constructor of :class:`_JobOperation`. """ def __init__( self, *args, eligible_operations=None, operations_with_unmet_preconditions=None, operations_with_met_postconditions=None, **kwargs, ): super().__init__(*args, **kwargs) if eligible_operations is None: eligible_operations = [] self.eligible_operations = eligible_operations if operations_with_unmet_preconditions is None: operations_with_unmet_preconditions = [] self.operations_with_unmet_preconditions = operations_with_unmet_preconditions if operations_with_met_postconditions is None: operations_with_met_postconditions = [] self.operations_with_met_postconditions = operations_with_met_postconditions class _FlowCondition: """A _FlowCondition represents a condition as a function of a signac job. The ``__call__()`` method of a _FlowCondition object may return either True or False, representing whether the condition is met or not. This can be used to build a graph of conditions and operations. Parameters ---------- callback : callable A callable with one positional argument (the job). """ def __init__(self, callback): self._callback = callback def __call__(self, jobs): try: return self._callback(*jobs) except Exception as error: assert len(jobs) == 1 raise UserConditionError( "An exception was raised while evaluating the condition {name} " "for job {jobs}.".format( name=self._callback.__name__, jobs=", ".join(map(str, jobs)) ) ) from error def __hash__(self): return hash(self._callback) def __eq__(self, other): return self._callback == other._callback
[docs] class BaseFlowOperation: """A :class:`~.BaseFlowOperation` represents a data space operation acting on any job. Every :class:`~.BaseFlowOperation` is associated with a specific command. Preconditions (pre) and postconditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False. An operation is considered "eligible" for execution when all preconditions are met and when at least one of the postconditions is not met. Preconditions are always met when the list of preconditions is empty. Postconditions are never met when the list of postconditions is empty. .. note:: This class should not be instantiated directly. Parameters ---------- pre : sequence of callables List of preconditions. post : sequence of callables List of postconditions. """ def __init__(self, pre=None, post=None): if pre is None: pre = [] if post is None: post = [] self._preconditions = [_FlowCondition(cond) for cond in pre] self._postconditions = [_FlowCondition(cond) for cond in post] def _eligible(self, aggregate, ignore_conditions=IgnoreConditions.NONE): """Determine eligibility of an aggregate. An aggregate is eligible when all preconditions are true and at least one postcondition is false, or corresponding conditions are ignored. Parameters ---------- aggregate : tuple of :class:`~signac.job.Job` The aggregate of signac jobs. ignore_conditions : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is :class:`IgnoreConditions.NONE`. Returns ------- bool Whether the aggregate is eligible. """ if not isinstance(ignore_conditions, IgnoreConditions): raise ValueError( "The ignore_conditions argument of FlowProject.run() " "must be a member of class IgnoreConditions." ) # len(self._preconditions) check for speed optimization met_preconditions = ( (len(self._preconditions) == 0) or (ignore_conditions & IgnoreConditions.PRE) or all(cond(aggregate) for cond in self._preconditions) ) if met_preconditions and len(self._postconditions) > 0: unmet_postconditions = (ignore_conditions & IgnoreConditions.POST) or any( not cond(aggregate) for cond in self._postconditions ) else: unmet_postconditions = True return met_preconditions and unmet_postconditions def _complete(self, jobs): """Check if all postconditions are met.""" if len(self._postconditions) > 0: return all(cond(jobs) for cond in self._postconditions) return False
[docs] class FlowCmdOperation(BaseFlowOperation): """An operation that executes a shell command. When an operation has the ``FlowProject.operation(cmd=True)`` directive specified, it is instantiated as a :class:`~.FlowCmdOperation`. The operation should be a function of one or more positional arguments that are instances of :class:`~signac.job.Job`. The command (cmd) may either be a callable that expects one or more instances of :class:`~signac.job.Job` as positional arguments and returns a string containing valid shell commands, or the string of commands itself. In either case, the resulting string may contain any attributes of the job (or jobs) placed in curly braces, which will then be substituted by Python string formatting. .. note:: This class should not be instantiated directly. Parameters ---------- cmd : str or callable The command to execute the operation. Callable values will be provided one or more positional arguments that are instances of :class:`~signac.job.Job`. String values will be formatted with ``cmd.format(jobs=jobs)`` where ``jobs`` is a tuple of :class:`~signac.job.Job`, or ``cmd.format(jobs=jobs, job=job)`` if only one job is provided. pre : sequence of callables List of preconditions. post : sequence of callables List of postconditions. """ def __init__(self, cmd, pre=None, post=None): super().__init__(pre=pre, post=post) self._cmd = cmd def __str__(self): return f"{type(self).__name__}(cmd='{self._cmd}')"
[docs] def __call__(self, *jobs): """Return the command formatted with the supplied job(s).""" return self._cmd(*jobs) if callable(self._cmd) else self._cmd
[docs] class FlowOperation(BaseFlowOperation): """An operation that executes a Python function. All operations without the ``FlowProject.operation(cmd=True)`` directive use this class. The callable ``op_func`` should be a function of one or more instances of :class:`~signac.job.Job`. .. note:: This class should not be instantiated directly. Parameters ---------- op_func : callable A callable function of ``*jobs``. pre : sequence of callables List of preconditions. post : sequence of callables List of postconditions. """ def __init__(self, op_func, pre=None, post=None): super().__init__(pre=pre, post=post) self._op_func = op_func def __str__(self): """Return string representing operation.""" return f"{type(self).__name__}(op_func='{self._op_func}')"
[docs] def __call__(self, *jobs): r"""Call the operation on the provided jobs. Parameters ---------- \*jobs : One or more instances of :class:`~signac.job.Job`. The jobs passed to the operation. Returns ------- object The result of the operation function. """ return self._op_func(*jobs)
[docs] class FlowGroupEntry: """A FlowGroupEntry registers operations for inclusion into a :class:`FlowGroup`. Application developers should not directly instantiate this class, but use :meth:`~.FlowProject.make_group` instead. Operation functions can be marked for inclusion into a :class:`FlowGroup` by decorating the functions with a corresponding :class:`FlowGroupEntry`. If the operation requires group specific directives, calling the :class:`FlowGroupEntry` with the keyword argument ``directives`` allows the setting of directives for the exclusively for the group. Doing this overrides the default directives specified by :meth:`FlowProject.operation`. Parameters ---------- name : str The name of the :class:`FlowGroup` to be created. project : flow.FlowProject The project the group is associated with. submit_options : str The :meth:`FlowProject.run` options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute. run_options : str The options to pass to ``entrypoint exec`` when running the group. Specifying this will cause the operation to be forked even if it otherwise would run in the current Python interpreter. group_aggregator : :class:`~.aggregator` aggregator object associated with the :class:`FlowGroup` (Default value = None). """ def __init__( self, name, project, submit_options="", run_options="", group_aggregator=None ): self.name = name self._project = project self.submit_options = submit_options self.run_options = run_options self.group_aggregator = group_aggregator
[docs] def __call__(self, func=None, /, *, directives=None): """Add the function into the group's operations. This call operator allows the class to be used as a decorator. Parameters ---------- func : callable The function to decorate. directives : dict Directives to use for resource requests and execution. The directives specified in this decorator are only applied when executing the operation through the :class:`FlowGroup`. To apply directives to an individual operation executed outside of the group, see :meth:`.FlowProject.operation`. Returns ------- func The decorated function. """ if directives is not None: warnings.warn( "The current directives (e.g. nranks, np) are deprecated as of flow 0.27. " "When updating flow please look at the documentation for the new style " "(https://docs.signac.io/en/latest/cluster_submission.html#submission-directives).", DeprecationWarning, ) if func is None: return functools.partial(self._internal_call, directives=directives) return self._internal_call(func, directives=directives)
def _internal_call(self, func, /, *, directives): if not any( func == op_func for _, op_func in self._project._OPERATION_FUNCTIONS ): raise FlowProjectDefinitionError( f"Cannot add function '{func}' to group without making the function an " f"operation. Add @MyProjectClass.operation below group decorator." ) if self.name in func._flow_groups[self._project]: raise FlowProjectDefinitionError( f"Cannot reregister operation '{func}' with the group '{self.name}'." ) func._flow_groups[self._project].add(self.name) if directives is None: return func if hasattr(func, "_flow_group_operation_directives"): func._flow_group_operation_directives[self.name] = directives else: func._flow_group_operation_directives = {self.name: directives} return func def _set_directives(self, func, directives): if hasattr(func, "_flow_group_operation_directives"): if self.name in func._flow_group_operation_directives: raise FlowProjectDefinitionError( "Cannot set directives because directives already exist " f"for operation '{func}' in group '{self.name}'." ) func._flow_group_operation_directives[self.name] = directives else: func._flow_group_operation_directives = {self.name: directives}
[docs] class FlowGroup: """A :class:`~.FlowGroup` represents a subset of a workflow for a project. A :class:`FlowGroup` is associated with one or more instances of :class:`~.BaseFlowOperation`. Examples -------- In the example below, the directives will be ``{'nranks': 4}`` for op1 and ``{'nranks': 2, 'executable': 'python3'}`` for op2. .. code-block:: python group = FlowProject.make_group(name='example_group') @group(directives={"nranks": 4}) @FlowProject.operation({"nranks": 2, "executable": "python3"}) def op1(job): pass @group @FlowProject.operation({"nranks": 2, "executable": "python3"}) def op2(job): pass Parameters ---------- name : str The name of the group to be used when calling from the command line. operations : dict A dictionary of operations where the keys are operation names and each value is a :class:`~.BaseFlowOperation`. operation_directives : dict A dictionary of additional parameters that provide instructions on how to execute a particular operation, e.g., specifically required resources. Operation names are keys and the dictionaries of directives are values. If an operation does not have directives specified, then the directives of the singleton group containing that operation are used. To prevent this, set the directives to an empty dictionary for that operation. submit_options : str The :meth:`FlowProject.run` options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute. run_options : str The options to pass to ``entrypoint exec`` when running the group. Specifying this will cause the operation to be forked even if it otherwise would run in the current Python interpreter. """ MAX_LEN_ID = 100 def __init__( self, name, operations=None, operation_directives=None, submit_options="", run_options="", ): if operation_directives is not None: warnings.warn( "The current directives (e.g. nranks, np) are deprecated as of flow 0.27. " "When updating flow please look at the documentation for the new style " "(https://docs.signac.io/en/latest/cluster_submission.html#submission-directives).", DeprecationWarning, ) self.name = name self.submit_options = submit_options self.run_options = run_options # Requires Python >=3.6: dict must be ordered to ensure consistent # pretend submission output for templates. self.operations = {} if operations is None else dict(operations) if operation_directives is None: self.operation_directives = {} else: self.operation_directives = operation_directives def _set_entrypoint_item(self, entrypoint, directives, key, default, jobs): """Set a value (executable, path) for entrypoint in command. Order of priority is the operation directives specified and then the project specified value. """ entrypoint[key] = directives.get(key, entrypoint.get(key, default)) if callable(entrypoint[key]): entrypoint[key] = entrypoint[key](*jobs) def _determine_entrypoint(self, entrypoint, directives, jobs): """Get the entrypoint for creating a _JobOperation. If path cannot be determined, then raise a RuntimeError since we do not know where to point to. """ entrypoint = entrypoint.copy() self._set_entrypoint_item( entrypoint, directives, "executable", sys.executable, jobs ) # If a path is not provided, default to the path to the file where the # FlowProject (subclass) is defined. # We are assuming that all the jobs belong to the same project default_path = inspect.getfile(jobs[0]._project.__class__) self._set_entrypoint_item(entrypoint, directives, "path", default_path, jobs) return "{} {}".format(entrypoint["executable"], entrypoint["path"]).lstrip() def _resolve_directives(self, name, defaults, env): all_directives = env._get_default_directives() if name in self.operation_directives: all_directives.update(self.operation_directives[name]) else: all_directives.update(defaults.get(name, {})) return all_directives def _submit_cmd(self, entrypoint, ignore_conditions, jobs): entrypoint = self._determine_entrypoint(entrypoint, {}, jobs) cmd = f"{entrypoint} run -o {self.name}" cmd = cmd if jobs is None else cmd + f" -j {get_aggregate_id(jobs)}" options = self.submit_options if ignore_conditions != IgnoreConditions.NONE: options += " --ignore-conditions=" + str(ignore_conditions) return " ".join((cmd, options)).strip() def _run_cmd(self, entrypoint, operation_name, operation, directives, jobs): if isinstance(operation, FlowCmdOperation): return operation(*jobs).lstrip() entrypoint = self._determine_entrypoint(entrypoint, directives, jobs) cmd = f"{entrypoint} exec {operation_name} {get_aggregate_id(jobs)} {self.run_options}" return cmd.strip() def __iter__(self): yield from self.operations.values() def __repr__(self): return ( f"{type(self).__name__}(name={repr(self.name)}, operations='" f"{' '.join(list(self.operations))}'," f"operation_directives={self.operation_directives}, " f"submit_options={repr(self.submit_options)}, run_options={repr(self.run_options)})" ) def _eligible(self, aggregate, ignore_conditions=IgnoreConditions.NONE): """Determine if at least one operation is eligible. A :class:`~.FlowGroup` is eligible for execution if at least one of its associated operations is eligible. Parameters ---------- aggregate : tuple of :class:`~signac.job.Job` The aggregate of signac jobs. ignore_conditions : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored while checking eligibility. The default is :class:`IgnoreConditions.NONE`. Returns ------- bool Whether the group is eligible. """ return any(op._eligible(aggregate, ignore_conditions) for op in self) def _complete(self, jobs): """Check if postconditions are met for all operations in the group. Parameters ---------- jobs : tuple The signac job handles. Returns ------- bool Whether the group is complete (all contained operations are complete). """ return all(op._complete(jobs) for op in self)
[docs] def add_operation(self, name, operation, directives=None): """Add an operation to the :class:`~.FlowGroup`. Parameters ---------- name : str The name of the operation. operation : :class:`~.BaseFlowOperation` The workflow operation to add to the :class:`~.FlowGroup`. directives : dict The operation specific directives. (Default value = None) """ self.operations[name] = operation if directives is not None: self.operation_directives[name] = directives
[docs] def isdisjoint(self, group): """Return whether two groups are disjoint. Groups are disjoint if they do not share any common operations. Parameters ---------- group : :class:`~.FlowGroup` The other :class:`~.FlowGroup` to compare to. Returns ------- bool Returns ``True`` if ``group`` and ``self`` share no operations, otherwise returns ``False``. """ return set(self).isdisjoint(set(group))
def _generate_id(self, aggregate, operation_name=None): """Generate a unique id which identifies this group and job(s). The generated value is used to identify interactions with the scheduler. Parameters ---------- aggregate : tuple of :class:`signac.job.Job` The aggregate of signac jobs. operation_name : str Operation name defining the unique id. (Default value = None) Returns ------- str The unique id. """ project = aggregate[0]._project # The full name is designed to be truly unique for each job-group. if operation_name is None: op_string = "".join(sorted(list(self.operations))) else: op_string = operation_name aggregate_id = get_aggregate_id(aggregate) full_name = f"{project.path}%{aggregate_id}%{op_string}" # The job_op_id is a hash computed from the unique full name. job_op_id = md5(full_name.encode("utf-8")).hexdigest() # The actual job id is then constructed from a readable part and the # job_op_id, ensuring that the job-op is still somewhat identifiable, # but guaranteed to be unique. The readable name is based on the # project id, aggregate id, and operation name. All names and the id # itself are restricted in length to guarantee that the id does not get # too long. max_len = self.MAX_LEN_ID - len(job_op_id) if max_len < len(job_op_id): raise ValueError(f"Value for MAX_LEN_ID is too small ({self.MAX_LEN_ID}).") separator = getattr(project._environment, "JOB_ID_SEPARATOR", "/") readable_name = "{project}{sep}{aggregate_id}{sep}{op_string}{sep}".format( sep=separator, project=project.__class__.__name__[:12], aggregate_id=aggregate_id, op_string=op_string[:12], )[:max_len] # By appending the unique job_op_id, we ensure that each id is truly unique. return readable_name + job_op_id def _create_submission_job_operation( self, entrypoint, default_directives, jobs, ignore_conditions_on_execution=IgnoreConditions.NONE, ): """Create a _JobOperation object from the :class:`~.FlowGroup`. Creates a _JobOperation for use in submitting and scripting. Parameters ---------- entrypoint : dict The path and executable, if applicable, to point to for execution. default_directives : dict The default directives to use for the operations. This is to allow for user specified groups to 'inherit' directives from ``default_directives``. If no defaults are desired, the argument can be set to an empty dictionary. This must be done explicitly, however. jobs : tuple of :class:`~signac.job.Job` The jobs that the :class:`~._JobOperation` is based on. ignore_conditions_on_execution : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored while checking eligibility during execution (after submission). The default is :class:`IgnoreConditions.NONE`. Returns ------- :class:`~._SubmissionJobOperation` Returns a :class:`~._SubmissionJobOperation` for submitting the group. The :class:`~._JobOperation` will have directives that have been collected appropriately from its contained operations. """ unevaluated_cmd = _cached_partial( self._submit_cmd, entrypoint=entrypoint, jobs=jobs, ignore_conditions=ignore_conditions_on_execution, ) def _get_run_ops(ignore_ops, additional_ignores_flag): """Get all runnable operations. Returns operations that match the combination of the conditions required by ``_create_submission_job_operation`` and the ignored flags, and remove operations in the ``ignore_ops`` list. Parameters ---------- ignore_ops : iterable Operations to ignore. additional_ignores_flag : :class:`~.IgnoreConditions` An additional set of ignore flags combined with the ignore flags used for execution. Returns ------- list of :class:`_JobOperation` Runnable operations. """ ignore_ops = set(ignore_ops) return [ op for op in self._create_run_job_operations( entrypoint=entrypoint, default_directives=default_directives, jobs=jobs, ignore_conditions=ignore_conditions_on_execution | additional_ignores_flag, ) if op not in ignore_ops ] submission_directives = self._get_submission_directives( default_directives, jobs ) eligible_operations = _get_run_ops([], IgnoreConditions.NONE) operations_with_unmet_preconditions = _get_run_ops( eligible_operations, IgnoreConditions.PRE ) operations_with_met_postconditions = _get_run_ops( eligible_operations, IgnoreConditions.POST ) submission_job_operation = _SubmissionJobOperation( self._generate_id(jobs), self.name, jobs, cmd=unevaluated_cmd, directives=dict(submission_directives), user_directives=set(submission_directives.user_keys), eligible_operations=eligible_operations, operations_with_unmet_preconditions=operations_with_unmet_preconditions, operations_with_met_postconditions=operations_with_met_postconditions, ) return submission_job_operation def _create_run_job_operations( self, entrypoint, default_directives, jobs, ignore_conditions=IgnoreConditions.NONE, ): """Create _JobOperation object(s) from the :class:`~.FlowGroup`. Yields a _JobOperation for each contained operation given proper conditions are met. Parameters ---------- entrypoint : dict The path and executable, if applicable, to point to for execution. default_directives : dict The default directives to use for the operations. This is to allow for user-specified groups to inherit directives from ``default_directives``. If no defaults are desired, the argument must be explicitly set to an empty dictionary. jobs : tuple of :class:`~signac.job.Job` The jobs that the :class:`~._JobOperation` is based on. ignore_conditions : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility check. The default is :class:`IgnoreConditions.NONE`. Returns ------- Iterator[_JobOperation] Iterator of eligible instances of :class:`~._JobOperation`. """ # Assuming all the jobs belong to the same FlowProject env = jobs[0]._project._environment for operation_name, operation in self.operations.items(): if operation._eligible(jobs, ignore_conditions): directives = self._resolve_directives( operation_name, default_directives, env ) directives.evaluate(jobs) # Return an unevaluated command to make evaluation lazy and # reduce side effects in callable FlowCmdOperations. unevaluated_cmd = _cached_partial( self._run_cmd, entrypoint=entrypoint, operation_name=operation_name, operation=operation, directives=directives, jobs=jobs, ) job_op = _JobOperation( self._generate_id(jobs, operation_name), operation_name, jobs, cmd=unevaluated_cmd, directives=dict(directives), user_directives=set(directives.user_keys), ) # Get the prefix, and if it's non-empty, set the fork directive # to True since we must launch a separate process. Override # the command directly. prefix = jobs[0]._project._environment.get_prefix(job_op) if prefix != "" or self.run_options != "": job_op.directives["fork"] = True job_op._cmd = f"{prefix} {job_op.cmd}" yield job_op def _get_submission_directives(self, default_directives, jobs): """Get the combined resources for submission. No checks are done to mitigate inappropriate aggregation of operations. This can lead to poor utilization of computing resources. """ env = jobs[0]._project._environment operation_names = list(self.operations.keys()) # The first operation's directives are evaluated, then all other # operations' directives are applied as updates with aggregate=True directives = self._resolve_directives( operation_names[0], default_directives, env ) directives.evaluate(jobs) for name in operation_names[1:]: # get directives for operation directives.update( self._resolve_directives(name, default_directives, env), aggregate=True, jobs=jobs, ) return directives
class _FlowProjectClass(type): """Metaclass for the FlowProject class.""" def __new__(metacls, name, bases, namespace): cls = type.__new__(metacls, name, bases, dict(namespace)) # All operation functions are registered with the operation() # classmethod, which is intended to be used as a decorator function. # _OPERATION_FUNCTIONS is a list of tuples of the operation name and # the operation function. In addition, preconditions and postconditions # are registered with the class. cls._OPERATION_FUNCTIONS = [] cls._OPERATION_PRECONDITIONS = defaultdict(list) cls._OPERATION_POSTCONDITIONS = defaultdict(list) cls._OPERATION_HOOK_REGISTRY = defaultdict(lambda: defaultdict(list)) # All label functions are registered with the label() classmethod, # which is intended to be used as decorator function. The # _LABEL_FUNCTIONS dict contains the function as key and the label name # as value, or None to use the default label name. cls._LABEL_FUNCTIONS = {} # Give the class a preconditions and postconditions class that are # aware of the class they are in. cls.pre = cls._setup_preconditions_class(parent_class=cls) cls.post = cls._setup_postconditions_class(parent_class=cls) # Give the class an operation register object cls.operation = cls._setup_operation_object(parent_class=cls) # All groups are registered with the function returned by the # make_group classmethod. In contrast to operations and labels, the # make_group classmethod does not serve as the decorator, the functor # it returns does. The _GROUPS list records the groups created and # their passed parameters for later initialization. The _GROUP_NAMES # set stores whether a group name has already been used. cls._GROUPS = [] cls._GROUP_NAMES = set() cls.operation_hooks = cls._setup_hooks_object(parent_class=cls) return cls @staticmethod def _setup_preconditions_class(parent_class): class pre(_condition): """Define and evaluate preconditions for operations. A precondition is a function accepting one or more jobs as positional arguments (``*jobs``) that must evaluate to True for this operation to be eligible for execution. For example: .. code-block:: python @Project.pre(lambda job: not job.doc.get('hello')) @Project.operation def hello(job): print('hello', job) job.doc.hello = True @Project.pre(lambda *jobs: all("hi_all" not in job.doc for job in jobs)) @Project.operation(aggregator=aggregator()) def hi_all(*jobs): print('hi', jobs) for job in jobs: job.doc.hi_all = True The *hello* operation would only execute if the 'hello' key in the job document does not evaluate to True. Similarly, the *hi_all* operation would execute only if the 'hi_all' key is not present in all of the jobs passed. An optional tag may be associated with the condition. These tags are used by :meth:`~.detect_operation_graph` when comparing conditions for equality. The tag defaults to the bytecode of the function. .. tip:: Use ``job.cached_statepoint`` for the best performance in preconditions that depend on the job's statepoint. """ _parent_class = parent_class def __call__(self, func): # Have to traverse the mro to ensure that func is already an operation and that # self.condition is not. operation_functions = [ func for name, func in self._parent_class._collect_operations() ] if func not in operation_functions: raise FlowProjectDefinitionError( "Conditions must come after (above) @FlowProject.operation." ) if self.condition in operation_functions: raise FlowProjectDefinitionError( "Operation functions cannot be used as preconditions." ) self._parent_class._OPERATION_PRECONDITIONS[func].insert( 0, self.condition ) return func @classmethod def copy_from(cls, *other_funcs): """Copy preconditions from other operation(s). True if and only if all preconditions of other operation function(s) are met. """ return cls( _create_all_metacondition( cls._parent_class._collect_preconditions(), *other_funcs ) ) @classmethod def after(cls, *other_funcs): """Precondition to run an operation after other operations. True if and only if all postconditions of other operation function(s) are met. """ operation_functions = [ operation[1] for operation in cls._parent_class._collect_operations() ] if not all( condition in operation_functions for condition in other_funcs ): raise FlowProjectDefinitionError( "The arguments to pre.after must be operations." ) return cls( _create_all_metacondition( cls._parent_class._collect_postconditions(), *other_funcs ) ) return pre @staticmethod def _setup_postconditions_class(parent_class): class post(_condition): """Define and evaluate postconditions for operations. A postcondition is a function accepting one or more jobs as positional arguments (``*jobs``) that must evaluate to True for this operation to be considered complete. For example: .. code-block:: python @Project.post(lambda job: job.doc.get('bye')) @Project.operation def bye(job): print('bye', job) job.doc.bye = True @Project.post(lambda *jobs: all("bye_all" in job.doc for job in jobs)) @Project.operation(aggregator=aggregator()) def bye_all(*jobs): print('bye', jobs) for job in jobs: job.doc.bye_all = True The *bye* operation would be considered complete and therefore no longer eligible for execution once the 'bye' key in the job document evaluates to True. Similarly, the *bye_all* operation would be considered complete and therefore no longer eligible for execution only if the 'bye_all' key is present in all of the jobs passed. An optional tag may be associated with the condition. These tags are used by :meth:`~.detect_operation_graph` when comparing conditions for equality. The tag defaults to the bytecode of the function. """ _parent_class = parent_class def __call__(self, func): # Have to traverse the mro to ensure that func is already an operation and that # self.condition is not. operation_functions = [ func for name, func in self._parent_class._collect_operations() ] if func not in operation_functions: raise FlowProjectDefinitionError( "Conditions must come after (above) @FlowProject.operation." ) if self.condition in operation_functions: raise FlowProjectDefinitionError( "Operation functions cannot be used as postconditions." ) self._parent_class._OPERATION_POSTCONDITIONS[func].insert( 0, self.condition ) return func @classmethod def copy_from(cls, *other_funcs): """Copy postconditions from other operation(s). True if and only if all postconditions of other operation function(s) are met. """ return cls( _create_all_metacondition( cls._parent_class._collect_postconditions(), *other_funcs ) ) return post @staticmethod def _setup_operation_object(parent_class): class OperationRegister: """Add operation functions to the class workflow definition. This object is designed to be used as a decorator, for example: .. code-block:: python @FlowProject.operation def hello(job): print('Hello', job) Directives can also be specified by using :meth:`FlowProject.operation`. .. code-block:: python @FlowProject.operation({"nranks": 4}) def mpi_hello(job): print("hello") Parameters ---------- func : callable The function to add to the workflow. name : str The operation name. Uses the name of the function if None. (Default value = None) cmd : bool, optional, keyword-only Whether the decorated function returns a shell executable string or not. When ``True``, the returned string is executed by the shell. Defaults to ``False``. with_job : bool, optional, keyword-only Whether to change directories to the job workspace when running the job. Defaults to ``False``. directives : dict, optional, keyword-only Directives to use for resource requests and execution. aggregator : flow.aggregator, optional, keyword-only The aggregator to use for the operation. Default value uses aggregator of size one (i.e. individual jobs). Returns ------- callable The operation function. """ _parent_class = parent_class def __call__( self, func=None, name=None, *, cmd=False, with_job=False, directives=None, aggregator=None, ): if isinstance(func, str): return lambda op: self._internal_call( op, name=func, cmd=cmd, with_job=with_job, directives=directives, op_aggregator=aggregator, ) if func is None: return lambda op: self._internal_call( op, name=name, cmd=cmd, with_job=with_job, directives=directives, op_aggregator=aggregator, ) return self._internal_call( func, name=name, cmd=cmd, with_job=with_job, directives=directives, op_aggregator=aggregator, ) def _internal_call( self, func, name, *, cmd, with_job, directives, op_aggregator ): if func in chain( *self._parent_class._OPERATION_PRECONDITIONS.values(), *self._parent_class._OPERATION_POSTCONDITIONS.values(), ): raise FlowProjectDefinitionError( "A condition function cannot be used as an operation." ) if cmd: setattr(func, "_flow_cmd", True) if with_job: func = self._with_job(func) # Store directives if directives is not None: func._flow_directives = directives if name is None: name = func.__name__ for ( registered_name, registered_func, ) in self._parent_class._OPERATION_FUNCTIONS: if name == registered_name: raise FlowProjectDefinitionError( f"An operation with name '{name}' is already registered." ) if func is registered_func: raise FlowProjectDefinitionError( "An operation with this function is already registered." ) if name in self._parent_class._GROUP_NAMES: raise FlowProjectDefinitionError( f"A group with name '{name}' is already registered." ) if not getattr(func, "_flow_aggregate", False): default_aggregator = aggregator.groupsof(1) if op_aggregator is None: op_aggregator = default_aggregator elif op_aggregator != default_aggregator: if getattr(func, "_flow_with_job", False): raise FlowProjectDefinitionError( "The with_job option cannot be used with aggregation." ) func._flow_aggregate = op_aggregator # Append the name and function to the class registry self._parent_class._OPERATION_FUNCTIONS.append((name, func)) self._parent_class._GROUPS.append( FlowGroupEntry(name=name, project=self._parent_class) ) if not hasattr(func, "_flow_groups"): func._flow_groups = {} func._flow_groups[self._parent_class] = {name} return func def _with_job(self, func): base_aggregator = aggregator.groupsof(1) if getattr(func, "_flow_aggregate", base_aggregator) != base_aggregator: raise FlowProjectDefinitionError( "The with_job keyword argument cannot be used with aggregation." ) @functools.wraps(func) def decorated(*jobs): with jobs[0] as job: if getattr(func, "_flow_cmd", False): return ( f'trap "cd $(pwd)" EXIT && cd {job.path} && {func(job)}' ) else: return func(job) decorated._flow_with_job = True decorated._flow_cmd = getattr(func, "_flow_cmd", False) return decorated _directives_to_document = ( ComputeEnvironment._get_default_directives()._directive_definitions.values() ) OperationRegister.__doc__ += textwrap.indent( "\n\n**Supported Directives:**\n\n" + "\n\n".join( _document_directive(directive) for directive in _directives_to_document ), " " * 16, ) return OperationRegister() @staticmethod def _setup_hooks_object(parent_class): class _HooksRegister: """Add hooks to an operation. This object is designed to be used as a decorator. The example below shows an operation level decorator that prints the operation name and job id at the start of the operation execution. .. code-block:: python def start_hook(operation_name, job): print(f"Starting operation {operation_name} on job {job.id}.") @FlowProject.operation_hooks.on_start(start_hook) @FlowProject.operation def foo(job): pass A hook is a function that is called at specific points during the execution of a job operation. In the example above, the ``start_hook`` hook function is executed before the operation **foo** runs. Hooks can also run after an operation finishes, when an operation exits with error, or when an operation exits without error. The available triggers are ``on_start``, ``on_exit``, ``on_exception``, and ``on_success`` which run when the operation starts, completes, fails, and succeeds respectively. Parameters ---------- hook_func : callable The function that will be executed at a specified point. trigger : string The point when a hook operation is executed. """ _parent_class = parent_class def __init__(self, hook_func, trigger): self._hook_func = hook_func self._hook_trigger = trigger @classmethod def on_start(cls, hook_func): """Add a hook function triggered before an operation starts.""" return cls(hook_func, trigger="on_start") @classmethod def on_exit(cls, hook_func): """Add a hook function triggered after the operation exits. The hook is triggered regardless of whether the operation exits with or without an error. """ return cls(hook_func, trigger="on_exit") @classmethod def on_success(cls, hook_func): """Add a hook function triggered after the operation exits without error.""" return cls(hook_func, trigger="on_success") @classmethod def on_exception(cls, hook_func): """Add a hook function triggered after the operation exits with an error.""" return cls(hook_func, trigger="on_exception") def __call__(self, func): """Add the decorated function to the operation hook registry. Parameters ---------- func : callable The operation function associated with the hook function. """ self._parent_class._OPERATION_HOOK_REGISTRY[func][ self._hook_trigger ].append(self._hook_func) return func return _HooksRegister def _config_value_as_bool(value): # Function to interpret a configobj bool-like value as a boolean. if isinstance(value, str): if value.lower() in {"true", "on", "yes", "1"}: return True elif value.lower() in {"false", "off", "no", "0"}: return False else: raise ValueError("Invalid boolean config value.") return bool(value)
[docs] class FlowProject(signac.Project, metaclass=_FlowProjectClass): """A signac project class specialized for workflow management. This class is used to define, execute, and submit workflows based on operations and conditions. Users typically interact with this class through its command line interface. This is a typical example of how to use this class: .. code-block:: python @FlowProject.operation def hello(job): print('hello', job) FlowProject().main() Parameters ---------- path : str, optional The project directory. By default, the current working directory (Default value = None). environment : :class:`flow.environment.ComputeEnvironment` An environment to use for scheduler submission. If ``None``, the environment is automatically identified. The default is ``None``. entrypoint : dict A dictionary with two possible keys: ``'executable'`` and ``'path'``. The path represents the location of the script file (the script file must call :meth:`FlowProject.main`). The executable represents the location of the Python interpreter used for the execution of :class:`~.BaseFlowOperation` that are Python functions. """ def __init__(self, path=None, environment=None, entrypoint=None): super().__init__(path=path) # Initialize the local config. # TODO: In signac 2.0 we will not allow config modification after # project initialization. The flow config is already effectively # immutable since it is an internal variable that is not exposed to the # user in any way. Once signac no longer relies on configobj and stops # supporting in-place config modification, flow can make use of the # signac Project config dictionary directly and that can be updated by # any flow config APIs. For now, we store the flow config separately to # avoid any side effects associated with modifying instances of # signac.project._ProjectConfig. self._flow_config = { **flow_config._FLOW_CONFIG_DEFAULTS, **self._config.get("flow", {}), } self._flow_config["eligible_jobs_max_lines"] = int( self._flow_config["eligible_jobs_max_lines"] ) self._flow_config["status_performance_warn_threshold"] = float( self._flow_config["status_performance_warn_threshold"] ) jsonschema.validate( self._flow_config, flow_config._FLOW_SCHEMA, format_checker=jsonschema.Draft7Validator.FORMAT_CHECKER, ) self._is_buffered = False self._jobs_cursor = None # Associate this class with a compute environment. self._environment = environment or get_environment() # Assign variables that give script location information self._entrypoint = {} if entrypoint is None else entrypoint # The standard local template directory is a directory called 'templates' within # the project root directory. This directory may be specified with the 'template_dir' # configuration variable. self._template_dir = os.path.join( self.path, self._config.get("template_dir", "templates") ) self._template_environment_ = {} # Setup execution hooks self._project_hooks = _Hooks() self._operation_hooks = defaultdict(_Hooks) # Register all label functions with this project instance. self._label_functions = {} self._register_labels() # Register all operation functions with this project instance. self._operations = {} self._register_operations() # Register all groups and aggregates with this project instance. self._groups = {} self._group_to_aggregate_store = _bidict() self._register_groups() def __iter__(self): """Provide a cached view of jobs while in a buffered state.""" if self._is_buffered: return iter(self._jobs_cursor) else: return super().__iter__() def __len__(self): """Provide a cached view of jobs while in a buffered state.""" if self._is_buffered: return len(self._jobs_cursor._ids) else: return super().__len__() def _contains_job_id(self, job_id): """Provide a cached view of jobs while in a buffered state.""" if self._is_buffered: return job_id in self._jobs_cursor._id_set else: return super()._contains_job_id(job_id) def _setup_template_environment(self): """Set up the jinja2 template environment. The templating system is used to generate templated scripts for the script() and _submit_operations() / submit() function and the corresponding command line subcommands. """ environment_modules = [ cls.__module__ for cls in registered_environments() if not cls.__module__.startswith("flow.environments") ] # Templates are searched in the local template directory first, then in additionally # installed packages, then in the main package 'templates' directory. extra_packages = [] for env in environment_modules: try: extra_packages.append(jinja2.PackageLoader(env, "templates")) except ImportError as error: logger.warning(f"Unable to load template from package '{error.name}'.") except ValueError as error: logger.warning( f"Unable to load template from package. Original Error '{error}'." ) load_envs = ( [jinja2.FileSystemLoader(self._template_dir)] + extra_packages + [jinja2.PackageLoader("flow", "templates")] ) template_environment = jinja2.Environment( loader=jinja2.ChoiceLoader(load_envs), trim_blocks=True, lstrip_blocks=True, extensions=[TemplateError], ) # Setup standard filters that can be used to format context variables. template_environment.filters[ "format_timedelta" ] = template_filters.format_timedelta template_environment.filters["format_memory"] = template_filters.format_memory template_environment.filters["identical"] = template_filters.identical template_environment.filters["with_np_offset"] = template_filters.with_np_offset template_environment.filters["calc_tasks"] = template_filters.calc_tasks template_environment.filters["calc_num_nodes"] = template_filters.calc_num_nodes template_environment.filters["calc_walltime"] = template_filters.calc_walltime template_environment.filters["calc_memory"] = template_filters.calc_memory template_environment.filters[ "check_utilization" ] = template_filters.check_utilization template_environment.filters[ "homogeneous_openmp_mpi_config" ] = template_filters.homogeneous_openmp_mpi_config template_environment.filters["get_config_value"] = flow_config.get_config_value template_environment.filters[ "get_account_name" ] = template_filters.get_account_name template_environment.filters["print_warning"] = template_filters.print_warning template_environment.filters["quote_argument"] = shlex.quote return template_environment def _template_environment(self, environment=None): if environment is None: environment = self._environment if environment not in self._template_environment_: template_environment = self._setup_template_environment() # Add environment-specific custom filters: for name, member in inspect.getmembers(environment): if getattr(member, "_flow_template_filter", False): template_environment.filters[name] = member self._template_environment_[environment] = template_environment return self._template_environment_[environment] def _get_standard_template_context(self): """Return the standard template context for run and submission scripts.""" context = {} context["project"] = self return context def _show_template_help_and_exit(self, template_environment, context): """Print all context variables and filters to screen and exit.""" from textwrap import TextWrapper wrapper = TextWrapper(width=90, break_long_words=False) print( TEMPLATE_HELP.format( template_dir=self._template_dir, template_vars="\n".join(wrapper.wrap(", ".join(sorted(context)))), filters="\n".join( wrapper.wrap(", ".join(sorted(template_environment.filters))) ), ) ) sys.exit(2) @property def project_hooks(self): """:class:`.hooks.Hooks` defined for all project operations. Project-wide hooks are added to an *instance* of the FlowProject, not the class. For example: .. code-block:: python def finish_hook(operation_name, job): print(f"Finished operation {operation_name} on job {job.id}") if __name__ == "__main__": project = FlowProject() project.project_hooks.on_exit.append(finish_hook) project.main() """ return self._project_hooks
[docs] @classmethod def label(cls, label_name_or_func=None): """Designate a function as a label function for this class. For example, we can define a label function like this: .. code-block:: python @FlowProject.label def foo_label(job): if job.document.get('foo', False): return 'foo-label-text' The ``foo-label-text`` label will now show up in the status view for each job, where the ``foo`` key evaluates true. If the label functions returns any type other than ``str``, the label name will be the name of the function if and only if the return value evaluates to ``True``, for example: .. code-block:: python @FlowProject.label def foo_label(job): return job.document.get('foo', False) Finally, specify a label name by providing it as the first argument to the ``label()`` decorator. Parameters ---------- label_name_or_func : str or callable A label name or callable. (Default value = None) Returns ------- callable A decorator for the label function. """ if callable(label_name_or_func): # This handles the case where no label name is given, as in # @FlowProject.label. label_name_or_func is a function. cls._LABEL_FUNCTIONS[label_name_or_func] = None return label_name_or_func def label_func(func): # This handles the case where a label name is given, as in # @FlowProject.label("label_name"). label_name_or_func is a string. cls._LABEL_FUNCTIONS[func] = label_name_or_func return func return label_func
[docs] def detect_operation_graph(self): """Determine the directed acyclic graph given by operation conditions. In general, executing a given operation registered with a FlowProject just involves checking the operation's preconditions and postconditions to determine eligibility. More generally, however, the preconditions and postconditions define a directed acyclic graph that governs the execution of all operations. Visualizing this graph can be useful for finding logic errors in the specified conditions, and having this graph computed also enables additional execution modes. For example, using this graph it is possible to determine exactly what operations need to be executed in order to make the operation eligible so that the task of executing all necessary operations can be automated. The graph is determined by iterating over all pairs of operations and checking for equality of preconditions and postconditions. The algorithm builds an adjacency matrix based on whether the preconditions for one operation match the postconditions for another. The comparison of operations is conservative; by default, conditions must be composed of identical code to be identified as equal (technically, they must have equivalent bytecode, i.e. ``cond1.__code__.co_code == cond2.__code__.co_code``). Users can specify that conditions should be treated as equal by providing tags to the operations. Given a :class:`~.FlowProject` subclass defined in a module ``project.py``, the output graph could be visualized using Matplotlib and NetworkX with the following code: .. code-block:: python import numpy as np import networkx as nx from matplotlib import pyplot as plt from project import Project project = Project() ops = project.operations.keys() adj = np.asarray(project.detect_operation_graph()) plt.figure() g = nx.DiGraph(adj) pos = nx.spring_layout(g) nx.draw(g, pos) nx.draw_networkx_labels( g, pos, labels={key: name for (key, name) in zip(range(len(ops)), [o for o in ops])}) plt.show() Returns ------- list of lists of int The adjacency matrix of operation dependencies. A zero indicates no dependency, and a 1 indicates dependency. This can be converted to a graph using NetworkX. Raises ------ :class:`RuntimeError` If a condition does not have a tag. This can occur when using ``functools.partial``, and a manually specified condition tag has not been set. """ def to_callbacks(conditions): """Get the actual callables associated with FlowConditions.""" return [condition._callback for condition in conditions] def unpack_conditions(condition_functions): """Create a set of callbacks from condition functions. Metaconditions in the list of condition functions are reduced into the functions that they are composed of. All condition functions must have a `_flow_tag` attribute defined. """ callbacks = set() for condition_function in condition_functions: # The condition function may not have a __name__ attribute in # cases where functools is used for condition creation. if ( hasattr(condition_function, "__name__") and condition_function.__name__ == "_flow_metacondition" ): callbacks = callbacks.union( unpack_conditions(condition_function._composed_of) ) else: if condition_function._flow_tag is None: raise RuntimeError( f"Condition {condition_function} was not tagged. " "To create a graph, ensure each base condition " "has a ``__code__`` attribute or manually " "specified tag." ) callbacks.add(condition_function._flow_tag) return callbacks operations = list(self.operations.values()) mat = [[0 for _ in range(len(operations))] for _ in range(len(operations))] for i, operation_i in enumerate(operations): for j, operation_j in enumerate(operations[i:]): postconditions_i = unpack_conditions( to_callbacks(operation_i._postconditions) ) postconditions_j = unpack_conditions( to_callbacks(operation_j._postconditions) ) preconditions_i = unpack_conditions( to_callbacks(operation_i._preconditions) ) preconditions_j = unpack_conditions( to_callbacks(operation_j._preconditions) ) if postconditions_i.intersection(preconditions_j): mat[i][j + i] = 1 elif preconditions_i.intersection(postconditions_j): mat[j + i][i] = 1 return mat
def _register_class_labels(self): """Register all label functions which are part of the class definition. To register a class method or function as a label function, use the :meth:`~.FlowProject.label()` class method as a decorator. """ def predicate(member): return inspect.ismethod(member) or inspect.isfunction(member) class_label_functions = {} for name, function in inspect.getmembers(type(self), predicate=predicate): if _is_label_func(function): class_label_functions[name] = function for name in sorted(class_label_functions): self._label_functions[class_label_functions[name]] = None def _register_labels(self): """Register all label functions registered with this class and its parent classes.""" self._register_class_labels() for cls in type(self).__mro__: self._label_functions.update(getattr(cls, "_LABEL_FUNCTIONS", {})) ALIASES = { str(status).replace("JobStatus.", ""): symbol for status, symbol in _FMT_SCHEDULER_STATUS.items() if status != JobStatus.placeholder } """Default aliases used within the status output.""" @classmethod def _alias(cls, name): """Use alias if specified. Parameters ---------- name : str Long name to abbreviate. Returns ------- str Abbreviation if it exists, otherwise the input name. """ try: return abbreviate(name, cls.ALIASES.get(name, name)) except TypeError: return name def _fn_bundle(self, bundle_id): """Return the canonical name to store bundle information.""" return os.path.join(self.path, ".bundles", bundle_id) @property def _bundle_prefix(self): sep = getattr(self._environment, "JOB_ID_SEPARATOR", "/") return f"{self.__class__.__name__}{sep}bundle{sep}" def _store_bundled(self, operations): """Store operation-ids as part of a bundle and return bundle id. The operation identifiers are stored in a text file whose name is determined by the _fn_bundle() method. This may be used to identify the status of individual operations from the bundle id. A single operation will not be stored, but instead the operation's id is directly returned. Parameters ---------- operations : A sequence of instances of :class:`._JobOperation` The operations to bundle. Returns ------- str The bundle id. """ if len(operations) == 1: return operations[0].id _id = sha1(".".join(op.id for op in operations).encode("utf-8")).hexdigest() bundle_id = self._bundle_prefix + _id fn_bundle = self._fn_bundle(bundle_id) os.makedirs(os.path.dirname(fn_bundle), exist_ok=True) with open(fn_bundle, "w") as file: for operation in operations: file.write(operation.id + "\n") return bundle_id def _expand_bundled_jobs(self, scheduler_jobs): """Expand jobs which were submitted as part of a bundle.""" if scheduler_jobs is None: return bundle_prefix = self._bundle_prefix for job in scheduler_jobs: if job.name().startswith(bundle_prefix): bundle_name = self._fn_bundle(job.name()) # Ensure that the bundle exists in this project before yielding # jobs from it. This check is necessary because scheduler jobs # with the same prefix could exist, submitted by other # FlowProjects with the same name from this user or other # users. See https://github.com/glotzerlab/signac-flow/issues/758 if os.path.exists(bundle_name): with open(bundle_name) as file: for line in file: yield ClusterJob(line.strip(), job.status()) else: yield job
[docs] def scheduler_jobs(self, scheduler): """Fetch jobs from the scheduler. This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically. However, this function will not automatically filter scheduler jobs which are not associated with this project. Parameters ---------- scheduler : :class:`~.Scheduler` The scheduler instance. Yields ------ :class:`~.ClusterJob`: All cluster jobs fetched from the scheduler. """ yield from self._expand_bundled_jobs(scheduler.jobs())
def _get_cached_scheduler_status(self): """Fetch all status information. The project document key ``_status`` is returned as a plain dict, or an empty dict if no status information is present. Returns ------- dict Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of :class:`~.JobStatus`. """ try: return self.document["_status"]() except KeyError: return {} @contextlib.contextmanager def _update_cached_scheduler_status(self): """Context manager used to update cached project status. When entered, this context manager yields an empty dictionary. The keys in this dictionary are unique generated ids for a given group and aggregate, and the value is an instance of :class:`~.JobStatus`. When the context exits, this dictionary is used to update the project document's cached status. Yields ------ dict Empty dictionary where status information should be stored. """ status_update = {} try: yield status_update finally: # This "finally" block cannot include "return", "break", or # "continue", or else saved exceptions raised in the context # manager will be lost and not be seen by the user. # https://docs.python.org/3/reference/compound_stmts.html#the-try-statement if status_update: status_update = { key: int(value) for key, value in status_update.items() } if "_status" in self.document: disk_status = self.document["_status"]() else: disk_status = {} disk_status.update(status_update) # Filter out JobStatus.unknown before writing to disk, to save # space and reduce the write time. disk_status = { key: value for key, value in disk_status.items() if value != int(JobStatus.unknown) } self.document["_status"] = disk_status def _generate_selected_aggregate_groups( self, selected_aggregates=None, selected_groups=None, tqdm_kwargs=None, ): """Yield selected aggregates and groups. Parameters ---------- selected_aggregates : sequence of tuples of :class:`~signac.job.Job` Aggregates to select. selected_groups : set of :class:`~.FlowGroup` Groups to select. tqdm_kwargs : dict or None A dict of keyword arguments to the tqdm progress bar used for iterating over aggregates. If None, no progress bar will be shown. (Default value = None) Yields ------ aggregate_id : str Selected aggregate id. aggregate : tuple of :class:`~signac.job.Job` Selected aggregate. group : :class:`~.FlowGroup` Selected group. """ def aggregate_progress_wrapper(aggregates): """Show progress bar if keyword arguments are provided.""" if tqdm_kwargs is not None: return tqdm( aggregates, total=len(aggregates), **tqdm_kwargs, ) else: return aggregates if ( selected_groups is not None and len(selected_groups) >= 0 and len(self._group_to_aggregate_store.inverse) > 1 ): # Use only aggregate stores for the selected groups. aggregate_stores_of_selected_groups = { self._group_to_aggregate_store[group] for group in selected_groups } aggregate_stores = { aggregate_store: self._group_to_aggregate_store.inverse[aggregate_store] for aggregate_store in aggregate_stores_of_selected_groups } else: # Use all aggregate stores. aggregate_stores = self._group_to_aggregate_store.inverse for ( aggregate_store, aggregate_groups, ) in aggregate_stores.items(): if selected_groups is not None: # Filter out groups that are not selected. The order of # aggregate_groups must be preserved. Using an intersection of # ordered sets would be preferable but would require a # dependency. matching_groups = [ group for group in aggregate_groups if group in selected_groups ] if len(matching_groups) == 0: # Skip aggregate store if no groups are selected continue else: matching_groups = aggregate_groups if selected_aggregates is not None: # Use selected aggregates in the aggregate store for aggregate in aggregate_progress_wrapper(selected_aggregates): aggregate_id = get_aggregate_id(aggregate) if aggregate_id in aggregate_store: for group in matching_groups: yield aggregate_id, aggregate, group else: # Use all aggregates in the aggregate store for aggregate_id, aggregate in aggregate_progress_wrapper( aggregate_store.items() ): for group in matching_groups: yield aggregate_id, aggregate, group def _generate_selected_aggregate_groups_with_status( self, scheduler_info=None, *args, **kwargs, ): r"""Yield selected aggregates and groups while updating cached status. After the iterator is exhausted (or if an exception is raised during iteration), the project's cached status will be updated for all aggregates and groups that have been yielded by this generator. Parameters ---------- scheduler_info : dict or None A dict of the form returned by :meth:`~._query_scheduler_status`, with keys corresponding to scheduler job names and values that are instances of :class:`~.JobStatus`. If None, all jobs will have unknown status. (Default value = None) \*args : Arguments forwarded to :meth:`~._generate_selected_aggregate_groups`. \*\*kwargs : Keyword arguments forwarded to :meth:`~._generate_selected_aggregate_groups`. Yields ------ scheduler_id : str Unique identifier for this aggregate and group. scheduler_status : :class:`~.JobStatus` The scheduler status of this aggregate and group. aggregate_id : str Selected aggregate id. aggregate : tuple of :class:`~signac.job.Job` Selected aggregate. group : :class:`~.FlowGroup` Selected group. """ if scheduler_info is None: scheduler_info = {} with self._update_cached_scheduler_status() as status_update: for ( aggregate_id, aggregate, group, ) in self._generate_selected_aggregate_groups(*args, **kwargs): scheduler_id = group._generate_id(aggregate) scheduler_status = scheduler_info.get(scheduler_id, JobStatus.unknown) status_update[scheduler_id] = scheduler_status yield scheduler_id, scheduler_status, aggregate_id, aggregate, group def _get_aggregate_group_status(self, aggregate, cached_status): """Fetch group status for this aggregate. Parameters ---------- aggregate : tuple of :class:`~signac.job.Job` Aggregate for which status information is fetched. cached_status : dict Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of :class:`~.JobStatus`. Yields ------ str Operation name. dict Operation status dictionary. """ starting_dict = functools.partial(dict, scheduler_status=JobStatus.unknown) status_dict = defaultdict(starting_dict) for ( aggregate_id, aggregate, group, ) in self._generate_selected_aggregate_groups( selected_aggregates=[aggregate], ): completed = group._complete(aggregate) # If the group is not completed, it is sufficient to determine # eligibility while ignoring postconditions (we know at least # one postcondition is not met). eligible = not completed and group._eligible( aggregate, ignore_conditions=IgnoreConditions.POST ) scheduler_status = cached_status.get( group._generate_id(aggregate), JobStatus.unknown ) for operation in group.operations: if scheduler_status >= status_dict[operation]["scheduler_status"]: status_dict[operation] = { "scheduler_status": scheduler_status, "eligible": eligible, "completed": completed, } yield from sorted(status_dict.items()) def _get_aggregate_status(self, aggregate, cached_status, ignore_errors=False): """Return status information about an aggregate. Parameters ---------- aggregate : tuple of :class:`~signac.job.Job` Aggregate for which status information is fetched. cached_status : dict Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of :class:`~.JobStatus`. (Default value = None) ignore_errors : bool Whether to ignore exceptions raised during status check. (Default value = False) Returns ------- dict A dictionary containing job status for all jobs. """ aggregate_id = get_aggregate_id(aggregate) result = { "job_id": aggregate_id, "operations": {}, "_operations_error": None, } try: result["operations"] = dict( self._get_aggregate_group_status(aggregate, cached_status) ) except Exception as error: logger.debug( "Error while getting operations status for job '%s': '%s'.", aggregate_id, error, ) if ignore_errors: result["_operations_error"] = str(error) else: raise return result
[docs] def get_job_status(self, job, ignore_errors=False, cached_status=None): """Return status information about a job. Parameters ---------- job : :class:`~signac.job.Job` The signac job. ignore_errors : bool Whether to ignore exceptions raised during status check. (Default value = False) cached_status : dict Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of :class:`~.JobStatus`. (Default value = None) Returns ------- dict A dictionary containing status for the requested job. """ if cached_status is None: cached_status = self._get_cached_scheduler_status() result = self._get_aggregate_status( aggregate=(job,), ignore_errors=ignore_errors, cached_status=cached_status, ) labels_result = self._get_job_labels(job, ignore_errors=ignore_errors) result["labels"] = labels_result["labels"] result["_labels_error"] = labels_result["_labels_error"] return result
def _query_scheduler_status(self, err=None, ignore_errors=False): """Query the scheduler for job status. Parameters ---------- err : file-like object File where status information is printed. ignore_errors : bool Whether to ignore exceptions raised during status check. Returns ------- dict : A dictionary of scheduler job information. The keys are scheduler job names and the values are instances of :class:`~.JobStatus`. If the scheduler cannot be found or an error occurs with ``ignore_errors=True``, an empty dic is returned. """ if err is None: err = sys.stderr try: scheduler = self._environment.get_scheduler() print("Querying scheduler...", file=err) return { scheduler_job.name(): scheduler_job.status() for scheduler_job in self.scheduler_jobs(scheduler) } except NoSchedulerError: logger.debug("No scheduler available.") except RuntimeError as error: logger.warning("Error occurred while querying scheduler: '%s'.", error) if not ignore_errors: raise return {} def _get_job_labels(self, job, ignore_errors=False): """Return a dict with information about the labels of a job. Parameters ---------- job : :class:`signac.job.Job` Job handle. ignore_errors : bool Whether to ignore errors raised while fetching labels. (Default value = False) Returns ------- dict Dictionary with keys ``job_id``, ``labels``, and ``_labels_error``. """ result = { "job_id": job.id, "labels": [], "_labels_error": None, } try: result["labels"] = sorted(set(self.labels(job))) except Exception as error: logger.debug( "Error while determining labels for job '%s': '%s'.", job, error ) if ignore_errors: result["_labels_error"] = str(error) else: raise return result def _fetch_status( self, aggregates, err, ignore_errors, status_parallelization="none", hide_progress=False, names=None, ): """Fetch status for the provided aggregates / jobs. Parameters ---------- aggregates : sequence of aggregates The aggregates for which a user requested to fetch status. err : file-like object File where status information is printed. ignore_errors : bool Fetch status even if querying the scheduler fails. status_parallelization : str Parallelization mode for fetching the status. Allowed values are "thread", "process", or "none". (Default value = "none") hide_progress : bool Hide the progress bar when printing status output (Default value = False). names : iterable of :class:`str` Only show status for operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) Returns ------- status_results : list A list of dictionaries containing keys ``aggregate_id``, ``groups``, and ``_error``. job_labels : list A list of dictionaries containing keys ``job_id``, ``labels``, and ``_labels_error``. individual_jobs : list of :class:`~signac.job.Job` List of jobs, filtered from aggregates containing one job. This is used internally to generate labels (labels are only supported by individual jobs) and the calling code in :meth:`~.print_status` also needs this information. This is returned from this method so that iteration over all aggregates only has to occur one time. """ if status_parallelization not in ("thread", "process", "none"): raise RuntimeError( "Configuration value status_parallelization is invalid. " "Valid choices are 'thread', 'process', or 'none'." ) parallel_executor = _get_parallel_executor( status_parallelization, hide_progress ) # Update the project's status cache scheduler_info = self._query_scheduler_status( err=err, ignore_errors=ignore_errors ) def compute_status(data): scheduler_id, scheduler_status, aggregate_id, aggregate, group = data status = {} error_text = None try: status["scheduler_status"] = scheduler_status completed = group._complete(aggregate) status["completed"] = completed if len(group.operations) > 1: status["eligible"] = group._eligible(aggregate) else: # For groups with only one operation, if the group is not # complete, it is sufficient to determine eligibility while # ignoring postconditions (we know at least one # postcondition is not met). status["eligible"] = not completed and group._eligible( aggregate, ignore_conditions=IgnoreConditions.POST ) except Exception as error: logger.debug( "Error while getting operations status for job '%s': '%s'.", aggregate_id, error, ) if not ignore_errors: raise error_text = str(error) status["completed"] = False status["eligible"] = False result = [ { "aggregate_id": aggregate_id, "group_name": group.name, "status": status, "_error": error_text, } ] if ( group.name not in self._operations and scheduler_status != JobStatus.unknown ): operation_status = { **status, "scheduler_status": JobStatus._to_group(scheduler_status), } for op_name in group.operations: result.append( { "aggregate_id": aggregate_id, "group_name": op_name, # Need to copy status otherwise we can overwrite the status # dictionary of all constituent operations at once. "status": operation_status.copy(), "_error": error_text, } ) return result status_groups = self._gather_selected_flow_groups(names) with self._buffered(): aggregate_groups = list( self._generate_selected_aggregate_groups_with_status( scheduler_info=scheduler_info, selected_aggregates=aggregates, selected_groups=status_groups, ) ) status_results = [] for result in parallel_executor( compute_status, aggregate_groups, desc="Fetching status", file=err, ): status_results.extend(result) # aggregate_groups is a list of tuples containing scheduler, # aggregate, and group information. To compute labels, we fetch the # unique jobs from the aggregates containing only one job. if len(self._groups) > 0: individual_jobs = { aggregate_group[3][0] for aggregate_group in aggregate_groups if len(aggregate_group[3]) == 1 } else: # If no operations exist, use all jobs in the project. individual_jobs = set(self) compute_labels = functools.partial( self._get_job_labels, ignore_errors=ignore_errors, ) if len(self._label_functions) > 0: job_labels = list( parallel_executor( compute_labels, individual_jobs, desc="Fetching labels", file=err, ) ) else: job_labels = [] def combine_group_and_operation_status(aggregate_status_results): group_statuses = {} # Iterate over all status results for singleton groups and all user created groups. # Given group job statuses being exclusive with respect to standard statuses we can # store only the group status (e.g. Group Active) in group submission. for status_result in aggregate_status_results: group_name = status_result["group_name"] # Only true when both a user group and singleton group report a status. We use the # one that is not unknown, so we don't store this status and continue. if ( group_name in group_statuses and status_result["status"]["scheduler_status"] == JobStatus.unknown ): continue group_statuses[group_name] = status_result["status"] return group_statuses status_results_combined = [] for aggregate_id, aggregate_results in groupby( sorted(status_results, key=lambda result: result["aggregate_id"]), key=lambda result: result["aggregate_id"], ): aggregate_results = list(aggregate_results) # Collect all errors that occurred while evaluating status of # groups for this aggregate error_message = None error_messages = [result["_error"] for result in aggregate_results] if any(error_messages): error_message = f"{len(error_messages)} error(s): " + ", ".join( error_messages ) status_results_combined.append( { "aggregate_id": aggregate_id, "groups": combine_group_and_operation_status(aggregate_results), "_error": error_message, } ) return status_results_combined, job_labels, individual_jobs PRINT_STATUS_ALL_VARYING_PARAMETERS = True """This constant can be used to signal that the print_status() method is supposed to automatically show all varying parameters."""
[docs] def print_status( self, jobs=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, param_max_width=None, expand=False, all_ops=False, only_incomplete=False, dump_json=False, unroll=True, compact=False, pretty=False, file=None, err=None, ignore_errors=False, template=None, profile=False, eligible_jobs_max_lines=None, output_format="terminal", hide_progress=False, operation=None, ): """Print the status of the project. Parameters ---------- jobs : iterable of :class:`~signac.job.Job` or aggregates If ``None``, print status for all jobs/aggregates. If not ``None``, only print status for the given jobs or aggregates (Default value = None). overview : bool Display an overview of the project status. (Default value = True) overview_max_lines : int Limit the number of overview lines. (Default value = None) detailed : bool Print a detailed status of each job. (Default value = False) parameters : list of str Print the value of the specified parameters. (Default value = None) param_max_width : int Limit the number of characters of parameter columns. (Default value = None) expand : bool Present labels and operations in two separate tables. (Default value = False) all_ops : bool Include operations that are not eligible to run. (Default value = False) only_incomplete : bool Only show jobs that have eligible operations. (Default value = False) dump_json : bool Output the data as JSON instead of printing the formatted output. (Default value = False) unroll : bool Separate columns for jobs and the corresponding operations. (Default value = True) compact : bool Print a compact version of the output. (Default value = False) pretty : bool Prettify the output. (Default value = False) file : str Redirect all output to this file, defaults to sys.stdout. err : str Redirect all error output to this file, defaults to sys.stderr. ignore_errors : bool Print status even if querying the scheduler fails. (Default value = False) template : str User provided Jinja2 template file. (Default value = None) profile : bool Show profile result. (Default value = False) eligible_jobs_max_lines : int Limit the number of operations and its eligible job count printed in the overview. (Default value = None) output_format : str Status output format, supports: 'terminal' (default), 'markdown' or 'html'. hide_progress : bool Hide the progress bar from the status output. (Default value = False) operation : iterable of :class:`str` Show status of operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) """ if file is None: file = sys.stdout if err is None: err = sys.stderr aggregates = self._convert_jobs_to_aggregates(jobs) if eligible_jobs_max_lines is None: eligible_jobs_max_lines = self._flow_config["eligible_jobs_max_lines"] status_parallelization = self._flow_config["status_parallelization"] # initialize jinja2 template environment and necessary filters template_environment = self._template_environment() context = self._get_standard_template_context() # get job status information if profile: try: import pprofile except ImportError: raise ImportError( "Profiling requires the pprofile package. " "Install with `pip install pprofile`." ) prof = pprofile.StatisticalProfile() fn_filter = [ inspect.getfile(threading), inspect.getfile(multiprocessing), inspect.getfile(Pool), inspect.getfile(ThreadPool), inspect.getfile(tqdm), ] with prof(single=False): status_results, job_labels, individual_jobs = self._fetch_status( aggregates=aggregates, err=err, ignore_errors=ignore_errors, status_parallelization=status_parallelization, hide_progress=hide_progress, names=operation, ) prof._mergeFileTiming() # Unrestricted total_impact = 0 hits = [ hit for fn, file_timing in prof.merged_file_dict.items() if fn not in fn_filter for hit in file_timing.iterHits() ] sorted_hits = reversed(sorted(hits, key=lambda hit: hit[2])) total_num_hits = sum(hit[2] for hit in hits) profiling_results = ["# Profiling:\n"] profiling_results.extend( ["Rank Impact Code object", "---- ------ -----------"] ) for i, (line, code, hits, duration) in enumerate(sorted_hits): impact = hits / total_num_hits total_impact += impact rank = i + 1 profiling_results.append( f"{rank:>4} {impact:>6.0%} {code.co_filename}:" f"{code.co_firstlineno}:{code.co_name}" ) if i > 10 or total_impact > 0.8: break for module_fn in prof.merged_file_dict: if re.match(profile, module_fn): file_timing = prof.merged_file_dict[module_fn] else: continue total_hits = file_timing.getTotalHitCount() total_impact = 0 profiling_results.append(f"\nHits by line for '{module_fn}':") profiling_results.append("-" * len(profiling_results[-1])) hits = list(sorted(file_timing.iterHits(), key=lambda h: 1 / h[2])) for line, code, hits, duration in hits: impact = hits / total_hits total_impact += impact profiling_results.append(f"{module_fn}:{line} ({impact:2.0%}):") try: lines, start = inspect.getsourcelines(code) except OSError: continue hits_ = [ file_timing.getHitStatsFor(line)[0] for line in range(start, start + len(lines)) ] profiling_results.extend( [ f"{h:>5} {lineno:>4}: {l.rstrip()}" for lineno, (l, h) in enumerate(zip(lines, hits_), start) ] ) profiling_results.append("") if total_impact > 0.8: break profiling_results.append(f"Total runtime: {int(prof.total_time)}s") if prof.total_time < 20: profiling_results.append( "Warning: Profiler ran only for a short time, " "results may be highly inaccurate." ) else: status_results, job_labels, individual_jobs = self._fetch_status( aggregates=aggregates, err=err, ignore_errors=ignore_errors, status_parallelization=status_parallelization, hide_progress=hide_progress, names=operation, ) profiling_results = None operations_errors = {status_entry["_error"] for status_entry in status_results} labels_errors = {status_entry["_labels_error"] for status_entry in job_labels} errors = list(filter(None, operations_errors.union(labels_errors))) if errors: logger.warning( "Some job status updates did not succeed due to errors. Number " "of unique errors: %i. Use --debug to list all errors.", len(errors), ) for i, error in enumerate(errors): logger.debug("Status update error #%i: '%s'", i + 1, error) # Get the total number of statuses before removing those with no # eligible groups. total_num_jobs_or_aggregates = len(status_results) def _has_any_eligible_group(status_entry): return any(group["eligible"] for group in status_entry["groups"].values()) if only_incomplete: # Remove jobs with no eligible groups from the status info. status_results = list(filter(_has_any_eligible_group, status_results)) total_num_eligible_jobs_or_aggregates = len(status_results) else: total_num_eligible_jobs_or_aggregates = sum( 1 for _ in filter(_has_any_eligible_group, status_results) ) def display_group_name(group_name): """Return the operation name or group name with number of operations.""" # If the name is from a group that is not an operation, we append the # number of operations to its name in the status. if group_name not in self._operations: num_operations = len(self._groups[group_name].operations) return f"{group_name} ({num_operations} ops)" return group_name statuses = {} # We store the name for display in statuses[aggregate_id][group_name]["display_name"] to # prevent the need for a Jinja filter. We store this as an additional parameter as multiple # places in the templates need this value including the statuses dictionary itself. for status_entry in status_results: statuses[status_entry["aggregate_id"]] = status_entry group_statuses = status_entry["groups"] for group_name, group_status in group_statuses.items(): group_status["display_name"] = display_group_name(group_name) # Add labels to the status information. for job_label_data in job_labels: job_id = job_label_data["job_id"] # There is no status information if the project has no operations. # If no status information exists for this job, we need to set # default values. if job_id in statuses: # Don't create label entries for job ids that were removed by # --only-incomplete-operations. statuses[job_id].setdefault("groups", {}) statuses[job_id]["labels"] = job_label_data["labels"] # If the dump_json variable is set, just dump all status info # formatted in JSON to screen. if dump_json: print(json.dumps(statuses, indent=4), file=file) return None if overview: # get overview info: progress = defaultdict(int) for status in job_labels: for label in status["labels"]: progress[label] += 1 # Sort the label progress by amount complete (descending), then # alphabetically. progress_sorted = list( islice( sorted(progress.items(), key=lambda x: (-x[1], x[0])), overview_max_lines, ) ) # Optionally expand parameters argument to all varying parameters. if parameters is self.PRINT_STATUS_ALL_VARYING_PARAMETERS: parameters = list( sorted( { key for job in individual_jobs for key in job.cached_statepoint.keys() if len( { _to_hashable(job.cached_statepoint.get(key)) for job in individual_jobs } ) > 1 } ) ) if parameters: # get parameters info def _add_parameters(status): aggregate_id = status["aggregate_id"] if aggregate_id.startswith("agg-"): # TODO: Fill parameters with empty values (or shared values?) raise ValueError("Cannot show parameters for aggregates.") job = self.open_job(id=aggregate_id) # Cache the job state point and document if used to render status parameters. statepoint = None document = None def dotted_get(mapping, key): """Fetch a value from a nested mapping using a dotted key.""" tokens = key.split(".") v = mapping for token in tokens: if v is None: return None v = v.get(token) return v status["parameters"] = {} for parameter in parameters: if not parameter.startswith("doc."): if parameter.startswith("sp."): parameter_name = parameter[3:] else: parameter_name = parameter if statepoint is None: statepoint = job.cached_statepoint status["parameters"][parameter] = shorten( str(self._alias(dotted_get(statepoint, parameter_name))), param_max_width, ) else: parameter_name = parameter[4:] if document is None: document = job.document() status["parameters"][parameter] = shorten( str(self._alias(dotted_get(document, parameter_name))), param_max_width, ) for status in statuses.values(): _add_parameters(status) for i, parameter in enumerate(parameters): parameters[i] = shorten(self._alias(str(parameter)), param_max_width) if detailed: # get detailed view info if compact: num_operations = len(self._groups) if pretty: OPERATION_STATUS_SYMBOLS = { "ineligible": "\u25cb", # open circle "eligible": "\u25cf", # black circle "active": "\u25b9", # open triangle "running": "\u25b8", # black triangle "completed": "\u2714", # check mark } # Pretty (unicode) symbols denoting the execution status of operations. else: OPERATION_STATUS_SYMBOLS = { "ineligible": "-", "eligible": "+", "active": "*", "running": ">", "completed": "X", } # ASCII symbols denoting the execution status of operations. operation_status_legend = " ".join( f"[{v}]:{k}" for k, v in OPERATION_STATUS_SYMBOLS.items() ) status_legend = " ".join(f"[{v}]:{k}" for k, v in self.ALIASES.items()) context["jobs"] = list(statuses.values()) context["total_num_jobs_or_aggregates"] = total_num_jobs_or_aggregates context[ "total_num_eligible_jobs_or_aggregates" ] = total_num_eligible_jobs_or_aggregates context["total_num_job_labels"] = len(job_labels) context["overview"] = overview context["detailed"] = detailed context["all_ops"] = all_ops context["parameters"] = parameters context["compact"] = compact context["pretty"] = pretty context["unroll"] = unroll context["status_legend"] = status_legend if overview: context["progress_sorted"] = progress_sorted if detailed: context["alias_bool"] = {True: "Y", False: "N"} context["scheduler_status_code"] = _FMT_SCHEDULER_STATUS if compact: context["extra_num_operations"] = max(num_operations - 1, 0) if not unroll: context["operation_status_legend"] = operation_status_legend context["operation_status_symbols"] = OPERATION_STATUS_SYMBOLS def _add_placeholder_operation(job): job["groups"][""] = { "completed": False, "eligible": False, "scheduler_status": JobStatus.placeholder, } for job in context["jobs"]: has_eligible_ops = any([v["eligible"] for v in job["groups"].values()]) if not has_eligible_ops and not context["all_ops"]: _add_placeholder_operation(job) op_counter = Counter() op_submission_status_counter = defaultdict(Counter) for job in context["jobs"]: for group_name, group_status in job["groups"].items(): # Exclude placeholder operations, which have no display name. if group_name != "": display_name = group_status["display_name"] if group_status["eligible"]: op_counter[display_name] += 1 op_submission_status_counter[display_name][ group_status["scheduler_status"] ] += 1 def _op_submission_summary(counter): """Generate string of statuses and counts, sorted by status.""" return ", ".join( f"[{_FMT_SCHEDULER_STATUS[status]}]: {count}" for status, count in sorted(counter.items()) ) op_counter_status = [ [ group_name, group_count, _op_submission_summary(op_submission_status_counter[group_name]), ] for group_name, group_count in op_counter.most_common( eligible_jobs_max_lines ) ] context["op_counter"] = op_counter_status num_omitted_operations = len(op_counter) - len(context["op_counter"]) if num_omitted_operations > 0: context["op_counter"].append( (f"[{num_omitted_operations} more operations omitted]", "", "") ) # We have to make a deep copy of the template environment if we're # using a process Pool for parallelism. Somewhere in the process of # manually pickling and dispatching tasks to individual processes # Python's reference counter loses track of the environment and ends up # destructing the template environment. This causes subsequent calls to # print_status to fail (although _fetch_status calls will still # succeed). template_environment_copy = ( deepcopy(template_environment) if status_parallelization == "process" else template_environment ) render_output = _render_status( template=template, template_environment=template_environment_copy, context=context, detailed=detailed, expand=expand, unroll=unroll, compact=compact, output_format=output_format, ) print(render_output, file=file) # Show profiling results (if enabled) if profiling_results: print("\n" + "\n".join(profiling_results), file=file)
def _run_operations( self, operations, pretend=False, np=None, timeout=None, progress=False ): """Execute the next operations as specified by the project's workflow. See also: :meth:`~.run`. Parameters ---------- operations : Sequence of instances of :class:`_JobOperation` The operations to execute. pretend : bool Do not actually execute the operations, but show the commands that would have been executed (Default value = False). np : int Parallelize to the specified number of processors. Use -1 to parallelize over all available processors. The value None uses one processor (Default value = None). timeout : float An optional timeout for each operation in seconds after which execution will be cancelled. Use None to indicate no timeout (Default value = None). progress : bool Show a progress bar during execution (Default value = False). """ if timeout is not None and timeout < 0: timeout = None operations = list(operations) # ensure list if np is None or np == 1 or pretend: if progress: operations = tqdm(operations) for operation in operations: self._execute_operation(operation, timeout, pretend) else: logger.debug("Parallelized execution of %i operation(s).", len(operations)) with Pool(processes=cpu_count() if np < 0 else np) as pool: logger.debug( "Parallelized execution of %i operation(s).", len(operations) ) try: self._run_operations_in_parallel( pool, operations, progress, timeout ) except self._PickleError as error: raise RuntimeError( "Unable to parallelize execution due to a pickling " f"error: {error}." ) class _PickleError(Exception): """Indicates a pickling error while trying to parallelize the execution of operations.""" @staticmethod def _job_operation_to_tuple(operation): return ( operation.id, operation.name, [job.id for job in operation._jobs], operation.cmd, operation.directives, ) def _job_operation_from_tuple(self, data): id, name, job_ids, cmd, directives = data jobs = tuple(self.open_job(id=job_id) for job_id in job_ids) return _JobOperation( id, name, jobs, cmd, directives, directives._keys_set_by_user ) def _run_operations_in_parallel(self, pool, operations, progress, timeout): """Execute operations in parallel. This function executes the given list of operations with the provided process pool. Since pickling of the project instance is likely to fail, we manually pickle the project instance and the operations before submitting them to the process pool. Parameters ---------- pool : :class:`multiprocessing.Pool` Process pool. operations : Sequence of instances of :class:`_JobOperation` The operations to execute. progress : bool Show a progress bar during execution. timeout : float A timeout for each operation in seconds after which execution will be cancelled. Use None to indicate no timeout (Default value = None). """ try: serialized_project = cloudpickle.dumps(self) serialized_tasks = [ ( cloudpickle.loads, serialized_project, self._job_operation_to_tuple(operation), ) for operation in tqdm( operations, desc="Serialize tasks", file=sys.stderr ) ] except ( Exception ) as error: # Masking all errors since they must be pickling related. raise self._PickleError(error) results = [ pool.apply_async(_deserialize_and_run_operation, task) for task in serialized_tasks ] if progress: results = tqdm(results) for result in results: result.get(timeout=timeout) @contextlib.contextmanager def _run_with_hooks(self, operation): name = operation.name jobs = operation._jobs # Determine operation hooks operation_hooks = self._operation_hooks.get(name, _Hooks()) self.project_hooks.on_start(name, *jobs) operation_hooks.on_start(name, *jobs) try: yield except Exception as error: self.project_hooks.on_exception(name, error, *jobs) operation_hooks.on_exception(name, error, *jobs) raise else: self.project_hooks.on_success(name, *jobs) operation_hooks.on_success(name, *jobs) finally: self.project_hooks.on_exit(name, *jobs) operation_hooks.on_exit(name, *jobs) def _execute_operation(self, operation, timeout=None, pretend=False): """Execute an operation. Parameters ---------- operation : :class:`_JobOperation` The operation to execute. timeout : float An optional timeout for each operation in seconds after which execution will be cancelled. Use None to indicate no timeout (Default value = None). pretend : bool Do not actually execute the operations, but show the commands that would have been executed (Default value = False). """ if pretend: print(operation.cmd) return None logger.info("Execute operation '%s'...", operation) execution_error_message = ( f"An exception was raised during operation {operation.name} for " f"job or aggregate with id {get_aggregate_id(operation._jobs)}." ) # Check if we need to fork for operation execution... if ( # The 'fork' directive was provided and evaluates to True: operation.directives.get("fork", False) # A separate process is needed to cancel with timeout: or timeout is not None # The operation function is an instance of FlowCmdOperation: or isinstance(self._operations[operation.name], FlowCmdOperation) # The specified executable is not the same as the interpreter instance: or operation.directives.get("executable", sys.executable) != sys.executable ): # ... need to fork: logger.debug( "Forking to execute operation '%s' with cmd '%s'.", operation, operation.cmd, ) try: with self._run_with_hooks(operation): subprocess.run( operation.cmd, shell=True, timeout=timeout, check=True ) except subprocess.CalledProcessError as error: raise UserOperationError(execution_error_message) from error else: # ... executing operation in interpreter process as function: logger.debug( "Executing operation '%s' with current interpreter process (%s).", operation, os.getpid(), ) try: with self._run_with_hooks(operation): self._operations[operation.name](*operation._jobs) except Exception as error: raise UserOperationError(execution_error_message) from error def _get_default_directives(self): return { name: self.groups[name].operation_directives.get(name, {}) for name in self.operations }
[docs] def run( self, jobs=None, names=None, pretend=False, np=None, timeout=None, num=None, num_passes=1, progress=False, order=None, ignore_conditions=IgnoreConditions.NONE, ): """Execute all eligible operations for the given selection. This function will run in an infinite loop until all eligible operations are executed, unless it reaches the maximum number of passes per operation or the maximum number of executions. By default there is no limit on the total number of executions, but a specific operation will only be executed once per job. This is to avoid accidental infinite loops when no or faulty postconditions are provided. Parameters ---------- jobs : iterable of :class:`~signac.job.Job` or aggregates If ``None``, execute operations for all eligible jobs/aggregates. If not ``None``, only execute operations for the given jobs or aggregates (Default value = None). names : iterable of :class:`str` Only execute operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) pretend : bool Do not actually execute the operations, but show the commands that would have been executed. (Default value = False) np : int Parallelize to the specified number of processors. Use -1 to parallelize over all available processors. The value None uses one processor (Default value = None). timeout : float An optional timeout for each operation in seconds after which execution will be cancelled. Use None to indicate no timeout (Default value = None). num : int The total number of operations that are executed will not exceed this argument if provided. (Default value = None) num_passes : int or None The total number of executions of one specific job-operation pair will not exceed this argument. The default is 1, there is no limit if this argument is None. progress : bool Show a progress bar during execution (Default value = False). order : str, callable, or None Specify the order of operations. Possible values are: * 'none' or None (no specific order) * 'by-job' (operations are grouped by job) * 'cyclic' (order operations cyclic by job) * 'random' (shuffle the execution order randomly) * callable (a callable returning a comparison key for an operation used to sort operations) The default value is ``'none'``, which is equivalent to ``'by-job'`` in the current implementation. .. note:: Users are advised to not rely on a specific execution order as a substitute for defining the workflow in terms of preconditions and postconditions. However, a specific execution order may be more performant in cases where operations need to access and potentially lock shared resources. ignore_conditions : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is :class:`IgnoreConditions.NONE`. """ aggregates = self._convert_jobs_to_aggregates(jobs) # Get all matching FlowGroups if isinstance(names, str): raise ValueError( "The names argument of FlowProject.run() must be a sequence of strings, " "not a string." ) if names is None: names = list(self.operations) # Get default directives default_directives = self._get_default_directives() # Negative values for the execution limits, means 'no limit'. if num_passes and num_passes < 0: num_passes = None if num and num < 0: num = None if not isinstance(ignore_conditions, IgnoreConditions): raise ValueError( "The ignore_conditions argument of FlowProject.run() " "must be a member of class IgnoreConditions." ) messages = [] def log(msg, lvl=logging.INFO): messages.append((msg, lvl)) reached_execution_limit = Event() def select(operation): if operation._jobs not in aggregates: return False if num is not None and select.total_execution_count >= num: reached_execution_limit.set() raise StopIteration # Reached total number of executions # Check whether the operation was executed more than the total number of allowed # passes *per operation* (default=1). if ( num_passes is not None and select.num_executions.get(operation, 0) >= num_passes ): log( f"Operation '{operation}' exceeds max. # of allowed " f"passes ({num_passes})." ) # Warn if an operation has no postconditions set. has_postconditions = ( len(self.operations[operation.name]._postconditions) > 0 ) if not has_postconditions: log( f"Operation '{operation.name}' has no postconditions!", logging.WARNING, ) return False # Reached maximum number of passes for this operation. # Increase execution counters for this operation. select.num_executions[operation] += 1 select.total_execution_count += 1 return True # Keep track of all executed job-operations; the number of executions # of each individual job-operation cannot exceed num_passes. select.num_executions = defaultdict(int) # Keep track of the total execution count, it may not exceed the value given by # num, if not None. # Note: We are not using sum(select.num_execution.values()) for efficiency. select.total_execution_count = 0 for i_pass in count(1): if reached_execution_limit.is_set(): logger.warning( "Reached the maximum number of operations that can be executed, but " "there are still eligible operations." ) break try: # Generate _JobOperation instances for selected groups and aggregates. with self._buffered(): operations = [] run_groups = set(self._gather_executable_flow_groups(names, i_pass)) for ( aggregate_id, aggregate, group, ) in self._generate_selected_aggregate_groups( selected_aggregates=aggregates, selected_groups=run_groups, ): operations.extend( group._create_run_job_operations( self._entrypoint, default_directives, aggregate, ignore_conditions, ) ) operations = list(filter(select, operations)) finally: if messages: for msg, level in set(messages): logger.log(level, msg) del messages[:] # clear if not operations: break # No more eligible operations or execution limits reached. def key_func_by_job(operation): # In order to group the aggregates in a by-job manner, we need # to first sort the aggregates using their aggregate id. return get_aggregate_id(operation._jobs) # Optionally re-order operations for execution if order argument is provided: if callable(order): operations = list(sorted(operations, key=order)) elif order == "cyclic": groups = [ list(group) for _, group in groupby( sorted(operations, key=key_func_by_job), key=key_func_by_job ) ] operations = list(_roundrobin(*groups)) elif order == "random": random.shuffle(operations) elif order is None or order in ("none", "by-job"): # by-job is the default order pass else: raise ValueError( "Invalid value for the 'order' argument, valid arguments are " "'none', 'by-job', 'cyclic', 'random', None, or a callable." ) logger.info( "Executing %i operation(s) (Pass #%02d)...", len(operations), i_pass, ) self._run_operations( operations, pretend=pretend, np=np, timeout=timeout, progress=progress )
def _gather_selected_flow_groups(self, names=None, i_pass=1): r"""Grabs :class:`~.FlowGroup`\ s that match any of a set of names. The provided names can be any regular expression that fully matches a group name. Parameters ---------- names : iterable of :class:`str` Only select groups that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) i_pass : int The current pass in `run`. Used to print the warning message only once. Returns ------- list List of groups matching the provided names. """ if names is None: return list(self._groups.values()) operations = {} absent_ops = [] for name in names: if name in operations: continue groups = [ group for group_name, group in self.groups.items() if re.fullmatch(name, group_name) ] if i_pass == 1 and len(groups) == 0: absent_ops.append(name) for group in groups: operations[group.name] = group if len(absent_ops) > 0: print( f"Unrecognized flow operation(s): {', '.join(absent_ops)}", file=sys.stderr, ) return list(operations.values()) def _gather_executable_flow_groups(self, names=None, i_pass=1): r"""Grabs immediately executable flow groups that match any given name. The provided names can be any regular expression that fully match a group name. Note ---- The behavior is distinct from ``_gather_selected_flow_groups`` in that for execution the default set is not all groups but all singleton groups (operations). Parameters ---------- names : iterable of :class:`str` Only select groups that match the provided set of names (interpreted as regular expressions), or all singleton groups if the argument is None. (Default value = None) i_pass : int The current pass in `run`. Used to print the warning message only once. Returns ------- list List of groups matching the provided names. """ if names is None: return [self._groups[op_name] for op_name in self.operations] operations = self._gather_selected_flow_groups(names, i_pass) # Have to verify no overlap to ensure all returned groups are # simultaneously executable. if not FlowProject._verify_group_compatibility(operations): raise ValueError( "Cannot specify groups or operations that will be included " "twice when using the -o/--operation option." ) return operations def _get_submission_operations( self, aggregates, default_directives, names=None, ignore_conditions=IgnoreConditions.NONE, ignore_conditions_on_execution=IgnoreConditions.NONE, ): r"""Grabs eligible :class:`~._JobOperation`\ s from :class:`~.FlowGroup`\ s. Parameters ---------- aggregates : sequence of tuples of :class:`~signac.job.Job` The aggregates to consider for submission. default_directives : dict The default directives to use for the operations. This is to allow for user specified groups to 'inherit' directives from ``default_directives``. If no defaults are desired, the argument can be set to an empty dictionary. This must be done explicitly, however. names : iterable of :class:`str` Only select operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) ignore_conditions : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is :class:`IgnoreConditions.NONE`. ignore_conditions_on_execution : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility after submitting. The default is :class:`IgnoreConditions.NONE`. Yields ------ :class:`~._SubmissionJobOperation` Returns a :class:`~._SubmissionJobOperation` for submitting the group. The :class:`~._JobOperation` will have directives that have been collected appropriately from its contained operations. """ submission_groups = set(self._gather_executable_flow_groups(names)) # Fetch scheduler status scheduler_info = self._query_scheduler_status() for ( scheduler_id, scheduler_status, aggregate_id, aggregate, group, ) in self._generate_selected_aggregate_groups_with_status( scheduler_info=scheduler_info, selected_aggregates=aggregates, selected_groups=submission_groups, ): if group._eligible( aggregate=aggregate, ignore_conditions=ignore_conditions ) and self._eligible_for_submission( group, aggregate, scheduler_status, scheduler_info ): yield group._create_submission_job_operation( entrypoint=self._entrypoint, default_directives=default_directives, jobs=aggregate, ignore_conditions_on_execution=ignore_conditions_on_execution, ) @classmethod def _verify_group_compatibility(cls, groups): """Verify that all selected groups can be submitted together.""" return all(a.isdisjoint(b) for a in groups for b in groups if a != b) def _get_aggregate_from_id(self, id, check_abbrevations=True): # The logic in this function slightly resembles that of signac's # Project.open_job. # We exit early if the id belongs to a single job. if not id.startswith("agg-"): try: return (self.open_job(id=id),) except LookupError: raise LookupError(f"Did not find job with id {repr(id)}.") # Next, attempt to find the id as a direct match by # iterating over all the aggregate stores and trying to access the # aggregate ids from those instances. for aggregate_store in self._group_to_aggregate_store.inverse: try: # Assume the id exists and skip the __contains__ check for # performance. If the id doesn't exist in this aggregate_store, # it will raise an exception that can be ignored. return aggregate_store[id] except KeyError: pass if check_abbrevations: # No direct match was found, so check for abbreviated ids. Requires # iteration over all elements of all aggregate stores. matches = set() for aggregate_store in self._group_to_aggregate_store.inverse: for full_id in aggregate_store: if full_id.startswith(id): matches.add(aggregate_store[full_id]) if len(matches) == 1: return next(iter(matches)) elif len(matches) > 1: raise LookupError(f"Did not find aggregate with id {repr(id)}.") # By elimination, len(matches) == 0 raise KeyError(f"Did not find aggregate with id {repr(id)}.") def _convert_jobs_to_aggregates(self, jobs): """Convert sequences of signac jobs to aggregates. The ``jobs`` parameter in public methods like :meth:`~.run`, :meth:`~.submit`, and :meth:`~.print_status` accepts a sequence of signac jobs. This method converts that sequence into a sequence of aggregates (tuples containing single signac jobs). """ if jobs is None: return _AggregateStoresCursor(self) elif isinstance(jobs, _AggregatesCursor): return jobs # Handle user-provided jobs/aggregates aggregates = [] for aggregate in jobs: if isinstance(aggregate, signac.job.Job): # aggregate is a single signac job. if aggregate not in self: raise LookupError(f"Did not find job {aggregate} in the project") aggregates.append((aggregate,)) else: try: aggregate = tuple(aggregate) assert all(isinstance(job, signac.job.Job) for job in aggregate) except (AssertionError, TypeError) as error: raise TypeError( "Invalid jobs argument. Please provide a valid " "signac job or aggregate of jobs." ) from error else: # aggregate is a tuple of signac jobs. # Ensure that the aggregate exists in one of the aggregate # stores associated with this project. This will raise an # error if not. aggregate_from_id = self._get_aggregate_from_id( get_aggregate_id(aggregate), check_abbrevations=False ) aggregates.append(aggregate_from_id) return aggregates @contextlib.contextmanager def _buffered(self): """Enable the use of buffered mode for certain functions.""" logger.debug("Entering buffered mode.") self._jobs_cursor = self.find_jobs() self._is_buffered = True with signac.buffered(): yield logger.debug("Exiting buffered mode.") self._is_buffered = False self._jobs_cursor = None def _generate_submit_script( self, _id, operations, template, show_template_help, **kwargs ): """Generate submission script to submit the execution of operations to a scheduler.""" if template is None: template = self._environment.template assert _id is not None template_environment = self._template_environment(self._environment) template = template_environment.get_template(template) context = self._get_standard_template_context() # The flow 'script.sh' file simply extends the base script # provided. The choice of base script is dependent on the # environment, but will default to the 'base_script.sh' provided # with signac-flow unless additional environment information is # detected. logger.info("Set 'base_script=%s'.", self._environment.template) logger.info("Use environment '%s'.", self._environment) context["base_script"] = self._environment.template context["environment"] = self._environment context["id"] = _id context["operations"] = list(operations) context.update(kwargs) context["resources"] = self._environment._get_scheduler_values(context) if show_template_help: self._show_template_help_and_exit(template_environment, context) return template.render(**context) def _submit_operations( self, operations, _id=None, parallel=False, flags=None, force=False, template="script.sh", show_template_help=False, **kwargs, ): r"""Submit a sequence of operations to the scheduler. Parameters ---------- operations : A sequence of instances of :class:`~._JobOperation` The operations to submit. _id : str The _id to be used for this submission. (Default value = None) parallel : bool Execute all bundled operations in parallel. (Default value = False) flags : list Additional options to be forwarded to the scheduler. (Default value = None) force : bool Ignore all warnings or checks during submission, just submit. (Default value = False) template : str The name of the template file to be used to generate the submission script. (Default value = "script.sh") show_template_help : bool Show information about available template variables and filters and exit. (Default value = False) \*\*kwargs Additional keyword arguments forwarded to :meth:`~.ComputeEnvironment.submit`. Returns ------- :class:`~.JobStatus` or None Returns the submission status after successful submission or None. """ if _id is None: _id = self._store_bundled(operations) print(f"Submitting cluster job '{_id}':", file=sys.stderr) def _msg(group): print(f" - Group: {group}", file=sys.stderr) return group try: script = self._generate_submit_script( _id=_id, operations=map(_msg, operations), template=template, show_template_help=show_template_help, parallel=parallel, force=force, **kwargs, ) except ConfigKeyError as error: key = str(error) raise SubmitError( f"Unable to submit, because of a configuration error.\n" f"The following key is missing: {key}.\n" f"Add the key to the configuration by executing:\n\n" f" $ signac config --global set {key} VALUE\n" ) else: # Keys which were explicitly set by the user, but are not evaluated by the # template engine are cause for concern and might hint at a bug in the template # script or ill-defined directives. Here we check whether all directive keys that # have been explicitly set by the user were actually evaluated by the template # engine and warn about those that have not been. keys_unused = { key for op in operations for key in op.directives._keys_set_by_user.difference( op.directives.keys_used ) if key not in ("fork", "nranks", "omp_num_threads") # ignore list } if keys_unused: logger.warning( "Some of the keys provided as part of the directives were not used by " "the template script, including: %s", ", ".join(sorted(keys_unused)), ) return self._environment.submit( _id=_id, script=script, flags=flags, **kwargs )
[docs] def submit( self, bundle_size=1, jobs=None, names=None, num=None, parallel=False, force=False, ignore_conditions=IgnoreConditions.NONE, ignore_conditions_on_execution=IgnoreConditions.NONE, **kwargs, ): r"""Submit function for the project's main submit interface. Parameters ---------- bundle_size : int Specify the number of operations to be bundled into one submission, defaults to 1. jobs : iterable of :class:`~signac.job.Job` or aggregates If ``None``, submit operations for all eligible jobs/aggregates. If not ``None``, only submit operations for the given jobs or aggregates (Default value = None). names : iterable of :class:`str` Only submit operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) num : int Limit the total number of submitted operations, defaults to no limit. parallel : bool Execute all bundled operations in parallel. (Default value = False) force : bool Ignore all warnings or checks during submission, just submit. (Default value = False) ignore_conditions : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is :class:`IgnoreConditions.NONE`. ignore_conditions_on_execution : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility after submitting. The default is :class:`IgnoreConditions.NONE`. \*\*kwargs Additional keyword arguments forwarded to :meth:`~.ComputeEnvironment.submit`. """ aggregates = self._convert_jobs_to_aggregates(jobs) # Regular argument checks and expansion if isinstance(names, str): raise ValueError( "The 'names' argument must be a sequence of strings, however " f"a single string was provided: {names}." ) if not isinstance(ignore_conditions, IgnoreConditions): raise ValueError( "The ignore_conditions argument of FlowProject.run() " "must be a member of class IgnoreConditions." ) # Gather all eligible operations. with self._buffered(): default_directives = self._get_default_directives() # The generator must be used *inside* the buffering context manager # for performance reasons. operation_generator = self._get_submission_operations( aggregates, default_directives, names, ignore_conditions, ignore_conditions_on_execution, ) # islice takes the first "num" elements from the generator, or all # items if num is None. operations = list(islice(operation_generator, num)) # Bundle them up and submit. with self._buffered(): with self._update_cached_scheduler_status() as status_update: for bundle in _make_bundles(operations, bundle_size): status = self._submit_operations( operations=bundle, parallel=parallel, force=force, **kwargs, ) if status is not None: # Operations were submitted, store status for operation in bundle: status_update[operation.id] = status
@classmethod def _add_submit_args(cls, parser): """Add arguments to submit subcommand to parser.""" parser.add_argument( "flags", type=str, nargs="*", help="Flags to be forwarded to the scheduler." ) parser.add_argument( "--pretend", action="store_true", help="Do not really submit, but print the submission script to screen.", ) parser.add_argument( "--force", action="store_true", help="Ignore all warnings and checks, just submit.", ) parser.add_argument( "--ignore-conditions", type=str, choices=["none", "pre", "post", "all"], default=IgnoreConditions.NONE, action=_IgnoreConditionsConversion, help="Specify conditions to ignore for eligibility check.", ) parser.add_argument( "--ignore-conditions-on-execution", type=str, choices=["none", "pre", "post", "all"], default=IgnoreConditions.NONE, action=_IgnoreConditionsConversion, help="Specify conditions to ignore after submitting. May be useful " "for conditions that cannot be checked once scheduled.", ) cls._add_operation_selection_arg_group(parser) cls._add_operation_bundling_arg_group(parser) cls._add_template_arg_group(parser) @classmethod def _add_template_arg_group(cls, parser, default="script.sh"): """Add argument group to parser for template handling.""" template_group = parser.add_argument_group( "templating", "The execution and submission scripts are always generated from a script " f"which is by default called '{default}' and located within the default " "template directory. The system uses a default template if none is provided. " "The default template extends from a base template, which may be different " "depending on the local compute environment, e.g., 'slurm.sh' for an environment " "with SLURM scheduler. The name of the base template is provided with the " "'base_script' template variable.", ) template_group.add_argument( "--template", type=str, default=default, help="The name of the template file within the template directory. " "The standard template directory is '${{project_root}}/templates' and " "can be configured with the 'template_dir' configuration variable. " f"Default: '{default}'.", ) template_group.add_argument( "--template-help", dest="show_template_help", action="store_true", help="Show information about the template context, including available variables " "and filter functions; then exit.", ) @classmethod def _add_job_selection_args(cls, parser): parser.add_argument( "-j", "--job-id", type=str, nargs="+", help="Only select jobs that match the given id(s).", ) parser.add_argument( "-f", "--filter", type=str, nargs="+", help="Only select jobs that match the given state point filter.", ) @classmethod def _add_operation_selection_arg_group(cls, parser): """Add argument group to parser for job-operation selection.""" selection_group = parser.add_argument_group( "job-operation selection", "By default, all eligible operations for all jobs are selected. Use " "the options in this group to reduce this selection.", ) cls._add_job_selection_args(selection_group) selection_group.add_argument( "-o", "--operation", dest="operation_name", nargs="+", help="Only select operation or groups that match the given " "operation/group name(s). These are interpreted as regular expressions.", ) selection_group.add_argument( "-n", "--num", type=int, help="Limit the total number of operations/groups to be selected. A group is " "considered to be one operation even if it consists of multiple operations.", ) @classmethod def _add_operation_bundling_arg_group(cls, parser): """Add argument group to parser for operation bundling.""" bundling_group = parser.add_argument_group( "bundling", "Bundle multiple operations for execution, e.g., to submit them " "all together to a cluster job, or execute them in parallel within " "an execution script.", ) bundling_group.add_argument( "-b", "--bundle", type=int, nargs="?", const=0, default=1, dest="bundle_size", help="Bundle multiple operations for execution in a single " "scheduler job. When this option is provided without an argument, " "all eligible operations are combined into one bundle.", ) bundling_group.add_argument( "-p", "--parallel", action="store_true", help="Execute all operations in a single bundle in parallel.", ) @classmethod def _add_print_status_args(cls, parser): """Add arguments to parser for the :meth:`~.print_status` method.""" cls._add_job_selection_args(parser) view_group = parser.add_argument_group( "view", "Specify how to format the status display." ) view_group.add_argument( "--json", dest="dump_json", action="store_true", help="Do not format the status display, but dump all data formatted in JSON.", ) view_group.add_argument( "-d", "--detailed", action="store_true", help="Show a detailed view of all jobs and their labels and operations.", ) view_group.add_argument( "-a", "--all-operations", dest="all_ops", action="store_true", help="Show information about all operations, not just active or eligible ones.", ) view_group.add_argument( "--only-incomplete-operations", dest="only_incomplete", action="store_true", help="Only show information for jobs with incomplete operations.", ) view_group.add_argument( "--stack", action="store_false", dest="unroll", help="Show labels and operations in separate rows.", ) view_group.add_argument( "-1", "--one-line", dest="compact", action="store_true", help="Show only one line per job.", ) view_group.add_argument( "-e", "--expand", action="store_true", help="Display job labels and job operations in two separate tables.", ) view_group.add_argument("--pretty", action="store_true") view_group.add_argument( "--full", action="store_true", help="Show all available information (implies --detailed --all-operations).", ) view_group.add_argument( "--no-overview", action="store_false", dest="overview", help="Do not print an overview.", ) view_group.add_argument( "-m", "--overview-max-lines", type=_positive_int, help="Limit the number of lines in the overview.", ) view_group.add_argument( "-p", "--parameters", type=str, nargs="*", help="Display select parameters of the job state point " "(with optional prefix 'sp.') or job document (by using prefix 'doc.') " "in the detailed view.", ) view_group.add_argument( "--param-max-width", type=int, help="Limit the width of each parameter row." ) view_group.add_argument( "--eligible-jobs-max-lines", type=_positive_int, help="Limit the number of eligible jobs that are shown.", ) parser.add_argument( "--ignore-errors", action="store_true", help="Ignore errors that might occur when querying the scheduler.", ) view_group.add_argument( "--output-format", type=str, default="terminal", help="Set status output format: terminal, markdown, or html.", )
[docs] def labels(self, job): """Yield all labels for the given ``job``. See also: :meth:`~.label`. Parameters ---------- job : :class:`signac.job.Job` Job handle. Yields ------ str Label value. """ for label_func, label_name in self._label_functions.items(): if label_name is None: label_name = getattr( label_func, "_label_name", getattr(label_func, "__name__", type(label_func).__name__), ) try: label_value = label_func(job) except TypeError: try: label_value = label_func(self, job) except Exception: label_func = getattr(self, label.__func__.__name__) label_value = label_func(job) assert label_name is not None if isinstance(label_value, str): yield label_value elif bool(label_value) is True: yield label_name
[docs] def completed_operations(self, job): """Determine which operations have been completed for job. Parameters ---------- job : :class:`~signac.job.Job` The signac job handle. Yields ------ str The names of the operations that are complete. """ for name, op in self._operations.items(): if op._complete((job,)): yield name
def _next_operations( self, jobs=None, operation_names=None, ignore_conditions=IgnoreConditions.NONE ): """Determine the next eligible operations for aggregates. Parameters ---------- jobs : tuple of :class:`~signac.job.Job` The signac job handles. By default all the aggregates are evaluated to get the next operation associated. operation_names : iterable of :class:`str` Only select operations that match the provided set of names (interpreted as regular expressions), or all single operation groups if the argument is None. (Default value = None) ignore_conditions : :class:`~.IgnoreConditions` Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is :class:`IgnoreConditions.NONE`. Yields ------ :class:`~._JobOperation` All eligible operations for the provided jobs. """ if jobs is None: jobs = _AggregateStoresCursor(self) if operation_names is None: selected_groups = {self._groups[name] for name in self.operations} else: selected_groups = set(self._gather_executable_flow_groups(operation_names)) for ( aggregate_id, aggregate, group, ) in self._generate_selected_aggregate_groups( selected_aggregates=jobs, selected_groups=selected_groups, ): yield from group._create_run_job_operations( entrypoint=self._entrypoint, default_directives={}, jobs=aggregate, ignore_conditions=ignore_conditions, ) @classmethod def _collect_operations(cls): """Collect all operations added with the ``@FlowProject.operation`` decorator.""" operations = [] for parent_class in cls.__mro__: operations.extend(getattr(parent_class, "_OPERATION_FUNCTIONS", [])) return operations @classmethod def _collect_conditions(cls, attr): """Collect conditions from attr using the mro hierarchy.""" ret = defaultdict(list) for parent_class in cls.__mro__: for func, conds in getattr(parent_class, attr, {}).items(): ret[func].extend(conds) return ret @classmethod def _collect_preconditions(cls): """Collect all preconditions added with the ``@FlowProject.pre`` decorator.""" return cls._collect_conditions("_OPERATION_PRECONDITIONS") @classmethod def _collect_postconditions(cls): """Collect all postconditions added with the ``@FlowProject.post`` decorator.""" return cls._collect_conditions("_OPERATION_POSTCONDITIONS") def _register_operations(self): """Register all operation functions registered with this class and its parent classes.""" operations = self._collect_operations() preconditions = self._collect_preconditions() postconditions = self._collect_postconditions() for name, func in operations: if name in self._operations: raise FlowProjectDefinitionError( f"Repeat definition of operation with name '{name}'." ) # Extract preconditions/postconditions and directives from function: params = { "pre": preconditions.get(func, None), "post": postconditions.get(func, None), } # Update operation hooks self._operation_hooks[name].update( _Hooks(**self._OPERATION_HOOK_REGISTRY[func]) ) # Construct FlowOperation: if getattr(func, "_flow_cmd", False): self._operations[name] = FlowCmdOperation(cmd=func, **params) else: self._operations[name] = FlowOperation(op_func=func, **params)
[docs] @classmethod def make_group(cls, name, submit_options="", run_options="", group_aggregator=None): r"""Make a :class:`~.FlowGroup` named ``name`` and return a decorator to make groups. A :class:`~.FlowGroup` is used to group operations together for running and submitting :class:`~._JobOperation`\ s. Examples -------- The code below creates a group and adds an operation to that group. .. code-block:: python example_group = FlowProject.make_group('example') @example_group @FlowProject.operation def foo(job): return "hello world" Parameters ---------- name : str The name of the :class:`~.FlowGroup`. submit_options : str The :meth:`FlowProject.run` options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute. run_options : str The options to pass to ``entrypoint exec`` when running the group. Specifying this will cause the operation to be forked even if it otherwise would run in the current Python interpreter. group_aggregator : :class:`~.aggregator` An instance of :class:`~flow.aggregator` to associate with the :class:`FlowGroup`. If None, no aggregation takes place (Default value = None). Returns ------- :class:`~.FlowGroupEntry` The created group. """ if name in cls._GROUP_NAMES: raise FlowProjectDefinitionError( f"Repeat definition of group with name '{name}'." ) if any( name == operation_name for operation_name, _ in cls._OPERATION_FUNCTIONS ): raise FlowProjectDefinitionError( f"Cannot create a group with the same name as the existing operation '{name}'." ) cls._GROUP_NAMES.add(name) group_entry = FlowGroupEntry( name=name, project=cls, submit_options=submit_options, run_options=run_options, group_aggregator=group_aggregator, ) cls._GROUPS.append(group_entry) return group_entry
def _register_groups(self): """Register all groups. Operations are assigned to each group. Aggregators are created for each group and tracked in a bidirectional mapping. """ group_entries = [] # Gather all groups from class and parent classes. for cls in type(self).__mro__: group_entries.extend(getattr(cls, "_GROUPS", [])) # Initialize all groups without operations. Also store the aggregators # associated with each group. The aggregate stores are cached so that # equivalent aggregators only generate once. created_aggregate_stores = {} for entry in group_entries: group = FlowGroup( entry.name, submit_options=entry.submit_options, run_options=entry.run_options, ) self._groups[entry.name] = group # Handle unset aggregators if entry.group_aggregator is None: # Use the operation's aggregator for singleton groups # corresponding to single operations if entry.name in self._operations: operation = self._operations[entry.name] if isinstance(operation, FlowCmdOperation): entry.group_aggregator = operation._cmd._flow_aggregate else: entry.group_aggregator = operation._op_func._flow_aggregate # The default group aggregator just iterates over jobs else: entry.group_aggregator = aggregator.groupsof() if entry.group_aggregator not in created_aggregate_stores: created_aggregate_stores[ entry.group_aggregator ] = entry.group_aggregator._create_AggregateStore(self) # Associate the group with its aggregate store self._group_to_aggregate_store[group] = created_aggregate_stores[ entry.group_aggregator ] # Add operations and directives to group for operation_name, operation in self._operations.items(): if isinstance(operation, FlowCmdOperation): func = operation._cmd else: func = operation._op_func op_directives = getattr(func, "_flow_group_operation_directives", {}) for cls in self.__class__.__mro__: # Need to use `get` since we don't know which class in the # hierarchy this function was registered to. for group_name in func._flow_groups.get(cls, []): directives = op_directives.get(group_name) self._groups[group_name].add_operation( operation_name, operation, directives ) # For singleton groups add directives directives = getattr(func, "_flow_directives", {}) self._groups[operation_name].operation_directives[ operation_name ] = directives def _reregister_aggregates(self): """Re-register the aggregates present in this :class:`~.FlowProject`.""" # TODO: This method could be consolidated with the code in _register_groups. # For now, we will not put it into the public API. for group in self._groups.values(): aggregator = self._group_to_aggregate_store[group] if isinstance(aggregator, _AggregateStore): aggregator._register_aggregates() self._group_to_aggregate_store[group] = aggregator @property def operations(self): """Get the dictionary of operations that have been added to the workflow.""" return self._operations @property def groups(self): """Get the dictionary of groups that have been added to the workflow.""" return self._groups def _eligible_for_submission( self, flow_group, jobs, scheduler_status, cached_status ): """Check group eligibility for submission with an aggregate. By default, a group is eligible for submission when it is not considered active, that means already queued or running. Parameters ---------- flow_group : :class:`~.FlowGroup` The FlowGroup used to determine eligibility. aggregate : tuple of :class:`~signac.job.Job` The aggregate of signac jobs. scheduler_status : :class:`~.JobStatus` The status of the provided group and aggregate (this should be known by the calling code and is re-used instead of fetching from the ``cached_status`` for efficiency). cached_status : dict Dictionary of status information. The keys are uniquely generated ids for each group and aggregate. The values are instances of :class:`~.JobStatus`. Returns ------- bool Whether the group is eligible for submission with the provided aggregate. """ def _group_is_submitted(flow_group): """Check if group has been submitted for the provided jobs.""" group_id = flow_group._generate_id(jobs) job_status = JobStatus(cached_status.get(group_id, JobStatus.unknown)) return job_status >= JobStatus.submitted if scheduler_status >= JobStatus.submitted: return False # Check if any other groups containing an operation from this group # have been submitted. Submitting both groups might cause conflicts. for other_group in self._groups.values(): if not flow_group.isdisjoint(other_group) and _group_is_submitted( other_group ): return False return True def _main_status(self, args): """Print status overview.""" aggregates = self._select_jobs_from_args(args) if args.compact and not args.unroll: logger.warning( "The -1/--one-line argument is incompatible with " "'--stack' and will be ignored." ) args = { key: val for key, val in vars(args).items() if key not in [ "func", "verbose", "debug", "job_id", "filter", ] } if args.pop("full"): args["detailed"] = args["all_ops"] = True start = time.time() try: self.print_status(jobs=aggregates, **args) except Exception as error: logger.error( f"Error during status update: {str(error)}\nUse '--ignore-errors' to " "complete the update anyways." ) raise error else: if aggregates is None: length_jobs = sum( len(aggregate_store) for aggregate_store in self._group_to_aggregate_store.inverse ) else: length_jobs = len(aggregates) # Use small offset to account for overhead with few jobs delta_t = (time.time() - start - 0.5) / max(length_jobs, 1) config_key = "status_performance_warn_threshold" warn_threshold = self._flow_config[config_key] if not args["profile"] and delta_t > warn_threshold >= 0: print( "WARNING: " f"The status compilation took more than {warn_threshold}s per job. " "Consider using `--profile` to determine bottlenecks " "within the project workflow definition.\n" f"Execute `signac config set flow.{config_key} VALUE` to specify " "the warning threshold in seconds.\n" "To speed up the compilation, try executing " "`signac config set flow.status_parallelization 'process'` to set " "the status_parallelization config value to process." "Use -1 to completely suppress this warning.\n", file=sys.stderr, ) def _main_next(self, args): """Determine the jobs that are eligible for a specific operation.""" if args.name not in self.operations: print( f"The requested flow operation '{args.name}' does not exist.", file=sys.stderr, ) else: for operation in self._next_operations(): # This filter cannot use the operation_names parameter to # _next_operations because it must be an exact match, not a # regex match. if args.name == operation.name: print(get_aggregate_id(operation._jobs)) def _main_run(self, args): """Run all (or select) job operations.""" # Select jobs: aggregates = self._select_jobs_from_args(args) # Setup partial run function, because we need to call this either # inside some context managers or not based on whether we need # to switch to the project root directory or not. run = functools.partial( self.run, jobs=aggregates, names=args.operation_name, pretend=args.pretend, np=args.parallel, timeout=args.timeout, num=args.num, num_passes=args.num_passes, progress=args.progress, order=args.order, ignore_conditions=args.ignore_conditions, ) if args.switch_to_project_root: with _add_cwd_to_environment_pythonpath(): with _switch_to_directory(self.path): run() else: run() def _main_submit(self, args): """Submit jobs to a scheduler.""" kwargs = vars(args) # Select jobs: aggregates = self._select_jobs_from_args(args) names = args.operation_name if args.operation_name else None self.submit(jobs=aggregates, names=names, **kwargs) def _main_exec(self, args): aggregates = self._select_jobs_from_args(args) try: operation = self._operations[args.operation] if isinstance(operation, FlowCmdOperation): def operation_function(job): cmd = operation(job) subprocess.run(cmd, shell=True, check=True) else: operation_function = operation except KeyError: raise KeyError(f"Unknown operation '{args.operation}'.") for aggregate_id, aggregate, group in self._generate_selected_aggregate_groups( selected_aggregates=aggregates, selected_groups={self._groups[args.operation]}, ): operation_function(*aggregate) def _select_jobs_from_args(self, args): """Select jobs with the given command line arguments ('-j/-f/--job-id').""" if not args.func == self._main_exec and args.job_id and (args.filter): raise ValueError( "Cannot provide both -j/--job-id and -f/--filter in combination." ) if args.job_id: # aggregates must be a set to prevent duplicate entries aggregates = set() for job_id in args.job_id: try: aggregates.add( self._get_aggregate_from_id(job_id, check_abbrevations=True) ) except KeyError as error: raise LookupError(error) return list(aggregates) elif args.func == self._main_exec: # exec command does not support filters, so we must exit early. return _AggregateStoresCursor(self) elif args.filter: # filter, including doc_filter provided. Filters can only be used to select # single jobs and not aggregates of multiple jobs. filter_ = parse_filter_arg(args.filter) return _JobAggregateCursor(self, filter_) else: # Use all aggregates return _AggregateStoresCursor(self)
[docs] def main(self, parser=None): """Call this function to use the main command line interface. In most cases one would want to call this function as part of the class definition: .. code-block:: python # my_project.py from flow import FlowProject class MyProject(FlowProject): pass if __name__ == '__main__': MyProject().main() The project can then be executed on the command line: .. code-block:: bash $ python my_project.py --help Parameters ---------- parser : :class:`argparse.ArgumentParser` The argument parser used to implement the command line interface. If None, a new parser is constructed. (Default value = None) """ # Find file that main is called in. When running through the command # line interface, we know exactly what the entrypoint path should be: # it's the file where main is called, which we can pull off the stack. self._entrypoint.setdefault( "path", os.path.realpath(inspect.stack()[-1].filename) ) if parser is None: parser = argparse.ArgumentParser() base_parser = argparse.ArgumentParser(add_help=False) # The argparse module does not automatically merge options shared between the main # parser and the subparsers. We therefore assign different destinations for each # option and then merge them manually below. for prefix, _parser in (("main_", parser), ("", base_parser)): _parser.add_argument( "-v", "--verbose", dest=prefix + "verbose", action="count", default=0, help="Increase output verbosity.", ) _parser.add_argument( "--debug", dest=prefix + "debug", action="store_true", help="This option implies `-vv`.", ) subparsers = parser.add_subparsers() parser_status = subparsers.add_parser( "status", parents=[base_parser], description="Parallelization of the status command can be " "controlled by setting the flow.status_parallelization config " "value to 'thread' (default), 'none', or 'process'. To do this, " "execute `signac config set flow.status_parallelization VALUE`.", ) self._add_print_status_args(parser_status) parser_status.add_argument( "--profile", const=inspect.getsourcefile(inspect.getmodule(self)), nargs="?", help="Collect statistics to determine code paths that are responsible " "for the majority of runtime required for status determination. " "Optionally provide a filename pattern to select for what files " "to show result for. Defaults to the main module. " "(requires pprofile)", ) parser_status.add_argument( "--hide-progress", action="store_true", help="Hide the progress bar", ) parser_status.add_argument( "-o", "--operation", type=str, nargs="+", help="Select operation or groups that match the given " "operation/group name(s). These are interpreted as regular expressions.", ) parser_status.set_defaults(func=self._main_status) parser_next = subparsers.add_parser( "next", parents=[base_parser], description="Determine jobs that are eligible for a specific operation.", ) parser_next.add_argument("name", type=str, help="The name of the operation.") parser_next.set_defaults(func=self._main_next) parser_run = subparsers.add_parser( "run", parents=[base_parser], ) self._add_operation_selection_arg_group(parser_run) execution_group = parser_run.add_argument_group("execution") execution_group.add_argument( "--pretend", action="store_true", help="Do not actually execute commands, just show them.", ) execution_group.add_argument( "--progress", action="store_true", help="Display a progress bar during execution.", ) execution_group.add_argument( "--num-passes", type=int, default=1, help="Specify how many times a particular job-operation may be executed within one " "session (default=1). This is to prevent accidental infinite loops, " "where operations are executed indefinitely, because postconditions " "were not properly set. Use -1 to allow for an infinite number of passes.", ) execution_group.add_argument( "-t", "--timeout", type=float, help="A timeout in seconds after which the execution of one operation is canceled.", ) execution_group.add_argument( "--switch-to-project-root", action="store_true", help="Temporarily add the current working directory to the python search path and " "switch to the root directory prior to execution.", ) execution_group.add_argument( "-p", "--parallel", type=int, nargs="?", const="-1", help="Specify the number of cores to parallelize to. Defaults to all available " "processing units.", ) execution_group.add_argument( "--order", type=str, choices=["none", "by-job", "cyclic", "random"], default=None, help="Specify the execution order of operations for each execution pass.", ) execution_group.add_argument( "--ignore-conditions", type=str, choices=["none", "pre", "post", "all"], default=IgnoreConditions.NONE, action=_IgnoreConditionsConversion, help="Specify conditions to ignore for eligibility check.", ) parser_run.set_defaults(func=self._main_run) parser_submit = subparsers.add_parser( "submit", parents=[base_parser], conflict_handler="resolve", ) self._add_submit_args(parser_submit) env_group = parser_submit.add_argument_group( f"{self._environment.__name__} options" ) self._environment.add_args(env_group) parser_submit.set_defaults(func=self._main_submit) parser_exec = subparsers.add_parser( "exec", parents=[base_parser], ) parser_exec.add_argument( "operation", type=str, choices=list(sorted(self._operations)), help="The operation to execute.", ) parser_exec.add_argument( "job_id", type=str, nargs="*", help="The job ids or aggregate ids in the FlowProject. " "Defaults to all jobs and aggregates.", ) parser_exec.set_defaults(func=self._main_exec) args = parser.parse_args() if not hasattr(args, "func"): parser.print_usage() sys.exit(2) # Manually 'merge' the various global options defined for both the main parser # and the parent parser that are shared by all subparsers: for dest in ("verbose", "debug"): setattr(args, dest, getattr(args, "main_" + dest) or getattr(args, dest)) delattr(args, "main_" + dest) if args.debug: # Implies '-vv' args.verbose = max(2, args.verbose) # Support print_status argument alias if args.func == self._main_status and args.full: args.detailed = args.all_ops = True # Empty parameters argument on the command line means: show all varying parameters. if hasattr(args, "parameters"): if args.parameters is not None and len(args.parameters) == 0: args.parameters = self.PRINT_STATUS_ALL_VARYING_PARAMETERS # Set verbosity level according to the `-v` argument. logging.basicConfig(level=max(0, logging.WARNING - 10 * args.verbose)) if args.verbose >= 1: print( "Using environment configuration:", self._environment.__name__, file=sys.stderr, ) try: args.func(args) except (TimeoutError, subprocess.TimeoutExpired) as error: print( "Error: Failed to complete execution due to " f"timeout ({args.timeout} seconds).", file=sys.stderr, ) raise error except Jinja2TemplateNotFound as error: print(f"Did not find template script '{error}'.", file=sys.stderr) raise error
def _deserialize_and_run_operation(loads, project, operation_data): project = loads(project) project._execute_operation(project._job_operation_from_tuple(operation_data)) return None __all__ = [ "FlowProject", "FlowOperation", "label", "staticlabel", "classlabel", ]