API Reference

This is the API for the signac-flow application.

The FlowProject

Attributes

FlowProject.ALIASES

Default aliases used within the status output.

FlowProject.completed_operations(job)

Determine which operations have been completed for job.

FlowProject.get_job_status(job[, ...])

Return status information about a job.

FlowProject.label([label_name_or_func])

Designate a function as a label function for this class.

FlowProject.labels(job)

Yield all labels for the given job.

FlowProject.main([parser])

Call this function to use the main command line interface.

FlowProject.make_group(name[, ...])

Make a FlowGroup named name and return a decorator to make groups.

FlowProject.operation

FlowProject.operation_hooks

alias of _HooksRegister

FlowProject.operation_hooks.on_exception(...)

Add a hook function triggered after the operation exits with an error.

FlowProject.operation_hooks.on_exit(hook_func)

Add a hook function triggered after the operation exits.

FlowProject.operation_hooks.on_start(hook_func)

Add a hook function triggered before an operation starts.

FlowProject.operation_hooks.on_success(hook_func)

Add a hook function triggered after the operation exits without error.

FlowProject.operations

Get the dictionary of operations that have been added to the workflow.

FlowProject.post(condition[, tag])

Define and evaluate postconditions for operations.

FlowProject.post.copy_from(*other_funcs)

Copy postconditions from other operation(s).

FlowProject.post.false(key)

Evaluate if a document key is False for the job(s).

FlowProject.post.isfile(filename)

Determine if the specified file exists for the job(s).

FlowProject.post.never(func)

Return False.

FlowProject.post.not_(condition)

Return not condition(*jobs) for the provided condition function.

FlowProject.post.true(key)

Evaluate if a document key is True for the job(s).

FlowProject.pre(condition[, tag])

Define and evaluate preconditions for operations.

FlowProject.pre.after(*other_funcs)

Precondition to run an operation after other operations.

FlowProject.pre.copy_from(*other_funcs)

Copy preconditions from other operation(s).

FlowProject.pre.false(key)

Evaluate if a document key is False for the job(s).

FlowProject.pre.isfile(filename)

Determine if the specified file exists for the job(s).

FlowProject.pre.never(func)

Return False.

FlowProject.pre.not_(condition)

Return not condition(*jobs) for the provided condition function.

FlowProject.pre.true(key)

Evaluate if a document key is True for the job(s).

FlowProject.print_status([jobs, overview, ...])

Print the status of the project.

FlowProject.project_hooks

hooks.Hooks defined for all project operations.

FlowProject.run([jobs, names, pretend, np, ...])

Execute all eligible operations for the given selection.

FlowProject.scheduler_jobs(scheduler)

Fetch jobs from the scheduler.

FlowProject.submit([bundle_size, jobs, ...])

Submit function for the project's main submit interface.

class flow.FlowProject(path=None, environment=None, entrypoint=None)[source]

Bases: Project

A signac project class specialized for workflow management.

This class is used to define, execute, and submit workflows based on operations and conditions.

Users typically interact with this class through its command line interface.

This is a typical example of how to use this class:

@FlowProject.operation
def hello(job):
    print('hello', job)

FlowProject().main()
Parameters:
  • path (str, optional) – The project directory. By default, the current working directory (Default value = None).

  • environment (flow.environment.ComputeEnvironment) – An environment to use for scheduler submission. If None, the environment is automatically identified. The default is None.

  • entrypoint (dict) – A dictionary with two possible keys: 'executable' and 'path'. The path represents the location of the script file (the script file must call FlowProject.main()). The executable represents the location of the Python interpreter used for the execution of BaseFlowOperation that are Python functions.

ALIASES = {'active': 'A', 'error': 'E', 'group_active': 'GA', 'group_error': 'GE', 'group_held': 'GH', 'group_inactive': 'GI', 'group_queued': 'GQ', 'group_registered': 'GR', 'group_submitted': 'GS', 'held': 'H', 'inactive': 'I', 'queued': 'Q', 'registered': 'R', 'submitted': 'S', 'unknown': 'U'}

Default aliases used within the status output.

PRINT_STATUS_ALL_VARYING_PARAMETERS = True

This constant can be used to signal that the print_status() method is supposed to automatically show all varying parameters.

completed_operations(job)[source]

Determine which operations have been completed for job.

Parameters:

job (Job) – The signac job handle.

Yields:

str – The names of the operations that are complete.

detect_operation_graph()[source]

Determine the directed acyclic graph given by operation conditions.

In general, executing a given operation registered with a FlowProject just involves checking the operation’s preconditions and postconditions to determine eligibility. More generally, however, the preconditions and postconditions define a directed acyclic graph that governs the execution of all operations. Visualizing this graph can be useful for finding logic errors in the specified conditions, and having this graph computed also enables additional execution modes. For example, using this graph it is possible to determine exactly what operations need to be executed in order to make the operation eligible so that the task of executing all necessary operations can be automated.

The graph is determined by iterating over all pairs of operations and checking for equality of preconditions and postconditions. The algorithm builds an adjacency matrix based on whether the preconditions for one operation match the postconditions for another. The comparison of operations is conservative; by default, conditions must be composed of identical code to be identified as equal (technically, they must have equivalent bytecode, i.e. cond1.__code__.co_code == cond2.__code__.co_code). Users can specify that conditions should be treated as equal by providing tags to the operations.

Given a FlowProject subclass defined in a module project.py, the output graph could be visualized using Matplotlib and NetworkX with the following code:

import numpy as np
import networkx as nx
from matplotlib import pyplot as plt

from project import Project

project = Project()
ops = project.operations.keys()
adj = np.asarray(project.detect_operation_graph())

plt.figure()
g = nx.DiGraph(adj)
pos = nx.spring_layout(g)
nx.draw(g, pos)
nx.draw_networkx_labels(
    g, pos,
    labels={key: name for (key, name) in
            zip(range(len(ops)), [o for o in ops])})

plt.show()
Returns:

The adjacency matrix of operation dependencies. A zero indicates no dependency, and a 1 indicates dependency. This can be converted to a graph using NetworkX.

Return type:

list of lists of int

Raises:

RuntimeError – If a condition does not have a tag. This can occur when using functools.partial, and a manually specified condition tag has not been set.

get_job_status(job, ignore_errors=False, cached_status=None)[source]

Return status information about a job.

Parameters:
  • job (Job) – The signac job.

  • ignore_errors (bool) – Whether to ignore exceptions raised during status check. (Default value = False)

  • cached_status (dict) – Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of JobStatus. (Default value = None)

Returns:

A dictionary containing status for the requested job.

Return type:

dict

property groups

Get the dictionary of groups that have been added to the workflow.

classmethod label(label_name_or_func=None)[source]

Designate a function as a label function for this class.

For example, we can define a label function like this:

@FlowProject.label
def foo_label(job):
    if job.document.get('foo', False):
        return 'foo-label-text'

The foo-label-text label will now show up in the status view for each job, where the foo key evaluates true.

If the label functions returns any type other than str, the label name will be the name of the function if and only if the return value evaluates to True, for example:

@FlowProject.label
def foo_label(job):
    return job.document.get('foo', False)

Finally, specify a label name by providing it as the first argument to the label() decorator.

Parameters:

label_name_or_func (str or callable) – A label name or callable. (Default value = None)

Returns:

A decorator for the label function.

Return type:

callable

labels(job)[source]

Yield all labels for the given job.

See also: label().

Parameters:

job (signac.job.Job) – Job handle.

Yields:

str – Label value.

main(parser=None)[source]

Call this function to use the main command line interface.

In most cases one would want to call this function as part of the class definition:

# my_project.py
from flow import FlowProject

class MyProject(FlowProject):
    pass

if __name__ == '__main__':
    MyProject().main()

The project can then be executed on the command line:

$ python my_project.py --help
Parameters:

parser (argparse.ArgumentParser) – The argument parser used to implement the command line interface. If None, a new parser is constructed. (Default value = None)

classmethod make_group(name, submit_options='', run_options='', group_aggregator=None)[source]

Make a FlowGroup named name and return a decorator to make groups.

A FlowGroup is used to group operations together for running and submitting _JobOperations.

Examples

The code below creates a group and adds an operation to that group.

example_group = FlowProject.make_group('example')

@example_group
@FlowProject.operation
def foo(job):
    return "hello world"
Parameters:
  • name (str) – The name of the FlowGroup.

  • submit_options (str) – The FlowProject.run() options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute.

  • run_options (str) – The options to pass to entrypoint exec when running the group. Specifying this will cause the operation to be forked even if it otherwise would run in the current Python interpreter.

  • group_aggregator (aggregator) – An instance of aggregator to associate with the FlowGroup. If None, no aggregation takes place (Default value = None).

Returns:

The created group.

Return type:

FlowGroupEntry

property operations

Get the dictionary of operations that have been added to the workflow.

print_status(jobs=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, param_max_width=None, expand=False, all_ops=False, only_incomplete=False, dump_json=False, unroll=True, compact=False, pretty=False, file=None, err=None, ignore_errors=False, template=None, profile=False, eligible_jobs_max_lines=None, output_format='terminal', operation=None)[source]

Print the status of the project.

Parameters:
  • jobs (iterable of Job or aggregates) – If None, print status for all jobs/aggregates. If not None, only print status for the given jobs or aggregates (Default value = None).

  • overview (bool) – Display an overview of the project status. (Default value = True)

  • overview_max_lines (int) – Limit the number of overview lines. (Default value = None)

  • detailed (bool) – Print a detailed status of each job. (Default value = False)

  • parameters (list of str) – Print the value of the specified parameters. (Default value = None)

  • param_max_width (int) – Limit the number of characters of parameter columns. (Default value = None)

  • expand (bool) – Present labels and operations in two separate tables. (Default value = False)

  • all_ops (bool) – Include operations that are not eligible to run. (Default value = False)

  • only_incomplete (bool) – Only show jobs that have eligible operations. (Default value = False)

  • dump_json (bool) – Output the data as JSON instead of printing the formatted output. (Default value = False)

  • unroll (bool) – Separate columns for jobs and the corresponding operations. (Default value = True)

  • compact (bool) – Print a compact version of the output. (Default value = False)

  • pretty (bool) – Prettify the output. (Default value = False)

  • file (str) – Redirect all output to this file, defaults to sys.stdout.

  • err (str) – Redirect all error output to this file, defaults to sys.stderr.

  • ignore_errors (bool) – Print status even if querying the scheduler fails. (Default value = False)

  • template (str) – User provided Jinja2 template file. (Default value = None)

  • profile (bool) – Show profile result. (Default value = False)

  • eligible_jobs_max_lines (int) – Limit the number of operations and its eligible job count printed in the overview. (Default value = None)

  • output_format (str) – Status output format, supports: ‘terminal’ (default), ‘markdown’ or ‘html’.

  • operation (iterable of str) – Show status of operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)

property project_hooks

hooks.Hooks defined for all project operations.

Project-wide hooks are added to an instance of the FlowProject, not the class. For example:

def finish_hook(operation_name, job):
    print(f"Finished operation {operation_name} on job {job.id}")

if __name__ == "__main__":
    project = FlowProject()
    project.project_hooks.on_exit.append(finish_hook)
    project.main()
run(jobs=None, names=None, pretend=False, np=None, timeout=None, num=None, num_passes=1, progress=False, order=None, ignore_conditions=IgnoreConditions.NONE)[source]

Execute all eligible operations for the given selection.

This function will run in an infinite loop until all eligible operations are executed, unless it reaches the maximum number of passes per operation or the maximum number of executions.

By default there is no limit on the total number of executions, but a specific operation will only be executed once per job. This is to avoid accidental infinite loops when no or faulty postconditions are provided.

Parameters:
  • jobs (iterable of Job or aggregates) – If None, execute operations for all eligible jobs/aggregates. If not None, only execute operations for the given jobs or aggregates (Default value = None).

  • names (iterable of str) – Only execute operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)

  • pretend (bool) – Do not actually execute the operations, but show the commands that would have been executed. (Default value = False)

  • np (int) – Parallelize to the specified number of processors. Use -1 to parallelize over all available processors. The value None uses one processor (Default value = None).

  • timeout (float) – An optional timeout for each operation in seconds after which execution will be cancelled. Use None to indicate no timeout (Default value = None).

  • num (int) – The total number of operations that are executed will not exceed this argument if provided. (Default value = None)

  • num_passes (int or None) – The total number of executions of one specific job-operation pair will not exceed this argument. The default is 1, there is no limit if this argument is None.

  • progress (bool) – Show a progress bar during execution. (Default value = False)

  • order (str, callable, or None) –

    Specify the order of operations. Possible values are:

    • ’none’ or None (no specific order)

    • ’by-job’ (operations are grouped by job)

    • ’cyclic’ (order operations cyclic by job)

    • ’random’ (shuffle the execution order randomly)

    • callable (a callable returning a comparison key for an operation used to sort operations)

    The default value is 'none', which is equivalent to 'by-job' in the current implementation.

    Note

    Users are advised to not rely on a specific execution order as a substitute for defining the workflow in terms of preconditions and postconditions. However, a specific execution order may be more performant in cases where operations need to access and potentially lock shared resources.

  • ignore_conditions (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is IgnoreConditions.NONE.

scheduler_jobs(scheduler)[source]

Fetch jobs from the scheduler.

This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically.

However, this function will not automatically filter scheduler jobs which are not associated with this project.

Parameters:

scheduler (Scheduler) – The scheduler instance.

Yields:

ClusterJob – All cluster jobs fetched from the scheduler.

submit(bundle_size=1, jobs=None, names=None, num=None, parallel=False, force=False, ignore_conditions=IgnoreConditions.NONE, ignore_conditions_on_execution=IgnoreConditions.NONE, **kwargs)[source]

Submit function for the project’s main submit interface.

Parameters:
  • bundle_size (int) – Specify the number of operations to be bundled into one submission, defaults to 1.

  • jobs (iterable of Job or aggregates) – If None, submit operations for all eligible jobs/aggregates. If not None, only submit operations for the given jobs or aggregates (Default value = None).

  • names (iterable of str) – Only submit operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)

  • num (int) – Limit the total number of submitted operations, defaults to no limit.

  • parallel (bool) – Execute all bundled operations in parallel. (Default value = False)

  • force (bool) – Ignore all warnings or checks during submission, just submit. (Default value = False)

  • ignore_conditions (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is IgnoreConditions.NONE.

  • ignore_conditions_on_execution (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility after submitting. The default is IgnoreConditions.NONE.

  • **kwargs – Additional keyword arguments forwarded to submit().

FlowProject.operation(func, name=None)

Add operation functions to the class workflow definition.

This object is designed to be used as a decorator, for example:

@FlowProject.operation
def hello(job):
    print('Hello', job)

Directives can also be specified by using FlowProject.operation().

@FlowProject.operation({"nranks": 4})
def mpi_hello(job):
    print("hello")
Parameters:
  • func (callable) – The function to add to the workflow.

  • name (str) – The operation name. Uses the name of the function if None. (Default value = None)

  • cmd (bool, optional, keyword-only) – Whether the decorated function returns a shell executable string or not. When True, the returned string is executed by the shell. Defaults to False.

  • with_job (bool, optional, keyword-only) – Whether to change directories to the job workspace when running the job. Defaults to False.

  • directives (dict, optional, keyword-only) – Directives to use for resource requests and execution.

  • aggregator (flow.aggregator, optional, keyword-only) – The aggregator to use for the operation. Default value uses aggregator of size one (i.e. individual jobs).

Returns:

The operation function.

Return type:

callable

Supported Directives:

executable

Return the path to the executable to be used for an operation.

The executable directive expects a string pointing to a valid executable file in the current file system.

When called, by default this should point to a Python executable (interpreter); however, if the FlowProject path is an empty string, the executable can be a path to an executable Python script. Defaults to sys.executable.

fork

The fork directive can be set to True to enforce that a particular operation is always executed within a subprocess and not within the Python interpreter’s process even if there are no other reasons that would prevent that.

Note

Setting fork=False will not prevent forking if there are other reasons for forking, such as a timeout.

memory

The memory to request for this operation.

The memory to validate should be either a float, int, or string. A valid memory argument is defined as:

  • Positive numeric value with suffix “g” or “G” indicating memory requested in gigabytes.

For example:

@Project.operation(directives={"memory": "4g"})
def op(job):
    pass
  • Positive numeric value with suffix “m” or “M” indicating memory requested in megabytes.

For example:

@Project.operation(directives={"memory": "512m"})
def op(job):
    pass
  • Positive numeric value with no suffix indicating memory requested in gigabytes.

For example:

@Project.operation(directives={"memory": "4"})
def op1(job):
    pass

@Project.operation(directives={"memory": 4})
def op2(job):
    pass

ngpu

The number of GPUs to use for this operation.

Expects a nonnegative integer. Defaults to 0.

np

The total number of CPU cores to request for a given operation.

Expects a natural number (i.e. an integer >= 1). This directive introspects into the “nranks” or “omp_num_threads” directives and uses their product if it is greater than the current set value. Defaults to 1.

Warning:

Generally for multicore applications, either this if not using MPI, or “nranks” and “omp_num_threads” should be specified but not both.

nranks

The number of MPI ranks to use for this operation. Defaults to 0.

Expects a nonnegative integer.

omp_num_threads

The number of OpenMP threads to use for this operation. Defaults to 0.

When used in conjunction with “nranks” this specifies the OpenMP threads per rank.

Using this directive sets the environmental variable OMP_NUM_THREADS in the operation’s execution environment.

Expects a nonnegative integer.

processor_fraction

Fraction of a resource to use on a single operation.

If set to 0.5 for a bundled job with 20 operations (all with ‘np’ set to 1), 10 CPUs will be used. Defaults to 1.

Note

This can be particularly useful on Stampede2’s launcher.

walltime

The number of hours to request for executing this job.

This directive expects a float representing the walltime in hours. Fractional values are supported. For example, a value of 0.5 will request 30 minutes of walltime. If no walltimes are requested, the submission will not specify a walltime in the output script. Some schedulers have a default value that will be used.

For example:

@Project.operation(directives={"walltime": 24})
def op(job):
    # This operation takes 1 day to run
    pass

FlowProject.operation_hooks(hook_func, trigger)

Add hooks to an operation.

This object is designed to be used as a decorator. The example below shows an operation level decorator that prints the operation name and job id at the start of the operation execution.

def start_hook(operation_name, job):
    print(f"Starting operation {operation_name} on job {job.id}.")

@FlowProject.operation_hooks.on_start(start_hook)
@FlowProject.operation
def foo(job):
    pass

A hook is a function that is called at specific points during the execution of a job operation. In the example above, the start_hook hook function is executed before the operation foo runs. Hooks can also run after an operation finishes, when an operation exits with error, or when an operation exits without error.

The available triggers are on_start, on_exit, on_exception, and on_success which run when the operation starts, completes, fails, and succeeds respectively.

Parameters:
  • hook_func (callable) – The function that will be executed at a specified point.

  • trigger (string) – The point when a hook operation is executed.

classmethod operation_hooks.on_exception(hook_func)

Add a hook function triggered after the operation exits with an error.

classmethod operation_hooks.on_exit(hook_func)

Add a hook function triggered after the operation exits.

The hook is triggered regardless of whether the operation exits with or without an error.

classmethod operation_hooks.on_start(hook_func)

Add a hook function triggered before an operation starts.

classmethod operation_hooks.on_success(hook_func)

Add a hook function triggered after the operation exits without error.

FlowProject.post(tag=None)

Define and evaluate postconditions for operations.

A postcondition is a function accepting one or more jobs as positional arguments (*jobs) that must evaluate to True for this operation to be considered complete. For example:

@Project.post(lambda job: job.doc.get('bye'))
@Project.operation
def bye(job):
    print('bye', job)
    job.doc.bye = True

@Project.post(lambda *jobs: all("bye_all" in job.doc for job in jobs))
@Project.operation(aggregator=aggregator())
def bye_all(*jobs):
    print('bye', jobs)
    for job in jobs:
        job.doc.bye_all = True

The bye operation would be considered complete and therefore no longer eligible for execution once the ‘bye’ key in the job document evaluates to True. Similarly, the bye_all operation would be considered complete and therefore no longer eligible for execution only if the ‘bye_all’ key is present in all of the jobs passed.

An optional tag may be associated with the condition. These tags are used by detect_operation_graph() when comparing conditions for equality. The tag defaults to the bytecode of the function.

classmethod post.copy_from(*other_funcs)

Copy postconditions from other operation(s).

True if and only if all postconditions of other operation function(s) are met.

classmethod post.false(key)

Evaluate if a document key is False for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to False.

classmethod post.isfile(filename)

Determine if the specified file exists for the job(s).

classmethod post.never(func)

Return False.

classmethod post.not_(condition)

Return not condition(*jobs) for the provided condition function.

classmethod post.true(key)

Evaluate if a document key is True for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to True.

FlowProject.pre(tag=None)

Define and evaluate preconditions for operations.

A precondition is a function accepting one or more jobs as positional arguments (*jobs) that must evaluate to True for this operation to be eligible for execution. For example:

@Project.pre(lambda job: not job.doc.get('hello'))
@Project.operation
def hello(job):
    print('hello', job)
    job.doc.hello = True

@Project.pre(lambda *jobs: all("hi_all" not in job.doc for job in jobs))
@Project.operation(aggregator=aggregator())
def hi_all(*jobs):
    print('hi', jobs)
    for job in jobs:
        job.doc.hi_all = True

The hello operation would only execute if the ‘hello’ key in the job document does not evaluate to True. Similarly, the hi_all operation would execute only if the ‘hi_all’ key is not present in all of the jobs passed.

An optional tag may be associated with the condition. These tags are used by detect_operation_graph() when comparing conditions for equality. The tag defaults to the bytecode of the function.

classmethod pre.after(*other_funcs)

Precondition to run an operation after other operations.

True if and only if all postconditions of other operation function(s) are met.

classmethod pre.copy_from(*other_funcs)

Copy preconditions from other operation(s).

True if and only if all preconditions of other operation function(s) are met.

classmethod pre.false(key)

Evaluate if a document key is False for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to False.

classmethod pre.isfile(filename)

Determine if the specified file exists for the job(s).

classmethod pre.never(func)

Return False.

classmethod pre.not_(condition)

Return not condition(*jobs) for the provided condition function.

classmethod pre.true(key)

Evaluate if a document key is True for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to True.

class flow.IgnoreConditions(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntFlag

Flags that determine which conditions are used to determine job eligibility.

ALL = 3

Ignore all conditions.

NONE = 0

Check all conditions.

POST = 2

Ignore postconditions.

PRE = 1

Ignore preconditions.

Operations and Status

class flow.project.BaseFlowOperation(pre=None, post=None)[source]

Bases: object

A BaseFlowOperation represents a data space operation acting on any job.

Every BaseFlowOperation is associated with a specific command.

Preconditions (pre) and postconditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False.

An operation is considered “eligible” for execution when all preconditions are met and when at least one of the postconditions is not met. Preconditions are always met when the list of preconditions is empty. Postconditions are never met when the list of postconditions is empty.

Note

This class should not be instantiated directly.

Parameters:
  • pre (sequence of callables) – List of preconditions.

  • post (sequence of callables) – List of postconditions.

class flow.project.FlowOperation(op_func, pre=None, post=None)[source]

Bases: BaseFlowOperation

An operation that executes a Python function.

All operations without the FlowProject.operation(cmd=True) directive use this class. The callable op_func should be a function of one or more instances of Job.

Note

This class should not be instantiated directly.

Parameters:
  • op_func (callable) – A callable function of *jobs.

  • pre (sequence of callables) – List of preconditions.

  • post (sequence of callables) – List of postconditions.

__call__(*jobs)[source]

Call the operation on the provided jobs.

Parameters:

*jobs (One or more instances of Job.) – The jobs passed to the operation.

Returns:

The result of the operation function.

Return type:

object

class flow.project.FlowCmdOperation(cmd, pre=None, post=None)[source]

Bases: BaseFlowOperation

An operation that executes a shell command.

When an operation has the FlowProject.operation(cmd=True) directive specified, it is instantiated as a FlowCmdOperation. The operation should be a function of one or more positional arguments that are instances of Job. The command (cmd) may either be a callable that expects one or more instances of Job as positional arguments and returns a string containing valid shell commands, or the string of commands itself. In either case, the resulting string may contain any attributes of the job (or jobs) placed in curly braces, which will then be substituted by Python string formatting.

Note

This class should not be instantiated directly.

Parameters:
  • cmd (str or callable) – The command to execute the operation. Callable values will be provided one or more positional arguments that are instances of Job. String values will be formatted with cmd.format(jobs=jobs) where jobs is a tuple of Job, or cmd.format(jobs=jobs, job=job) if only one job is provided.

  • pre (sequence of callables) – List of preconditions.

  • post (sequence of callables) – List of postconditions.

__call__(*jobs)[source]

Return the command formatted with the supplied job(s).

Labels

class flow.label(name=None)[source]

Bases: object

Decorate a FlowProject class function as a label function.

For example:

class MyProject(FlowProject):

    @label()
    def foo(self, job):
        return True
__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters:

func (callable) – The function to decorate.

Returns:

The decorated function.

Return type:

callable

class flow.staticlabel(name=None)[source]

Bases: label

A label decorator for staticmethods.

This decorator implies “staticmethod”!

__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters:

func (callable) – The function to decorate.

Returns:

The decorated function.

Return type:

callable

class flow.classlabel(name=None)[source]

Bases: label

A label decorator for classmethods.

This decorator implies “classmethod”!

__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters:

func (callable) – The function to decorate.

Returns:

The decorated function.

Return type:

callable

flow.init()

flow.init(alias=None, template=None, path=None)[source]

Initialize a templated FlowProject module.

Parameters:
  • alias (str) – Python identifier used as a file name for the template output. Uses "project" if None. (Default value = None)

  • template (str) –

    Name of the template to use. Built-in templates are:

    • "minimal"

    • "example"

    • "testing"

    Uses "minimal" if None. (Default value = None)

  • path (str) – Directory where the output file is placed. Uses the current working directory if None. (Default value = None)

Returns:

List of file names created.

Return type:

list

flow.get_environment()

flow.get_environment(test=False)[source]

Attempt to detect the present environment.

This function iterates through all defined ComputeEnvironment classes in reversed order of definition and returns the first environment where the is_present() method returns True.

Parameters:
  • test (bool) – Whether to return the TestEnvironment. (Default value = False)

  • import_configured (bool) – Whether to import environments specified in the flow configuration. (Default value = True)

Returns:

The detected environment class.

Return type:

ComputeEnvironment

The FlowGroup

class flow.project.FlowGroup(name, operations=None, operation_directives=None, submit_options='', run_options='')[source]

Bases: object

A FlowGroup represents a subset of a workflow for a project.

A FlowGroup is associated with one or more instances of BaseFlowOperation.

Examples

In the example below, the directives will be {'nranks': 4} for op1 and {'nranks': 2, 'executable': 'python3'} for op2.

group = FlowProject.make_group(name='example_group')

@group(directives={"nranks": 4})
@FlowProject.operation({"nranks": 2, "executable": "python3"})
def op1(job):
    pass

@group
@FlowProject.operation({"nranks": 2, "executable": "python3"})
def op2(job):
    pass
Parameters:
  • name (str) – The name of the group to be used when calling from the command line.

  • operations (dict) – A dictionary of operations where the keys are operation names and each value is a BaseFlowOperation.

  • operation_directives (dict) – A dictionary of additional parameters that provide instructions on how to execute a particular operation, e.g., specifically required resources. Operation names are keys and the dictionaries of directives are values. If an operation does not have directives specified, then the directives of the singleton group containing that operation are used. To prevent this, set the directives to an empty dictionary for that operation.

  • submit_options (str) – The FlowProject.run() options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute.

  • run_options (str) – The options to pass to entrypoint exec when running the group. Specifying this will cause the operation to be forked even if it otherwise would run in the current Python interpreter.

add_operation(name, operation, directives=None)[source]

Add an operation to the FlowGroup.

Parameters:
  • name (str) – The name of the operation.

  • operation (BaseFlowOperation) – The workflow operation to add to the FlowGroup.

  • directives (dict) – The operation specific directives. (Default value = None)

isdisjoint(group)[source]

Return whether two groups are disjoint.

Groups are disjoint if they do not share any common operations.

Parameters:

group (FlowGroup) – The other FlowGroup to compare to.

Returns:

Returns True if group and self share no operations, otherwise returns False.

Return type:

bool

class flow.project.FlowGroupEntry(name, project, submit_options='', run_options='', group_aggregator=None)[source]

Bases: object

A FlowGroupEntry registers operations for inclusion into a FlowGroup.

Application developers should not directly instantiate this class, but use make_group() instead.

Operation functions can be marked for inclusion into a FlowGroup by decorating the functions with a corresponding FlowGroupEntry. If the operation requires group specific directives, calling the FlowGroupEntry with the keyword argument directives allows the setting of directives for the exclusively for the group. Doing this overrides the default directives specified by FlowProject.operation().

Parameters:
  • name (str) – The name of the FlowGroup to be created.

  • project (flow.FlowProject) – The project the group is associated with.

  • submit_options (str) – The FlowProject.run() options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute.

  • run_options (str) – The options to pass to entrypoint exec when running the group. Specifying this will cause the operation to be forked even if it otherwise would run in the current Python interpreter.

  • group_aggregator (aggregator) – aggregator object associated with the FlowGroup (Default value = None).

__call__(func=None, /, *, directives=None)[source]

Add the function into the group’s operations.

This call operator allows the class to be used as a decorator.

Parameters:
  • func (callable) – The function to decorate.

  • directives (dict) – Directives to use for resource requests and execution. The directives specified in this decorator are only applied when executing the operation through the FlowGroup. To apply directives to an individual operation executed outside of the group, see FlowProject.operation().

Returns:

The decorated function.

Return type:

func

Aggregation

class flow.aggregator(aggregator_function=None, sort_by=None, sort_ascending=True, select=None)[source]

Bases: object

Class for generating aggregates for use in operations.

By default, if the aggregator_function is None, an aggregate of all jobs will be created.

Examples

The code block below defines a FlowOperation that prints the total length of the provided aggregate of jobs.

@FlowProject.operation(aggregator=aggregator())
def foo(*jobs):
    print(len(jobs))
Parameters:
  • aggregator_function (callable or None) – A callable that performs aggregation of jobs. It takes in a list of jobs and can return or yield subsets of jobs as an iterable. The default behavior is creating a single aggregate of all jobs.

  • sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the key argument to sorted(). If None, no sorting is performed (Default value = None).

  • sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).

  • select (callable or None) – Condition for filtering individual jobs. This is passed as the function argument to filter(). If None, no filtering is performed (Default value = None).

classmethod groupby(key, default=None, sort_by=None, sort_ascending=True, select=None)[source]

Aggregate jobs according to matching state point values.

Examples

The code block below provides an example of how to aggregate jobs by a state point parameter "key". If the state point does not contain the key "key", a default value of -1 is used.

@FlowProject.operation(aggregator=aggregator.groupby(key="key", default=-1))
def foo(*jobs):
    print(len(jobs))
Parameters:
  • key (str, Iterable[str], or callable) – The method by which jobs are grouped. It may be a state point key or an iterable of state point keys whose values define the groupings. It may also be an arbitrary callable of Job when greater flexibility is needed.

  • default (Any) – Default value used for grouping if the key is missing or invalid. If key is an iterable, the default value must be a sequence of equal length. If key is a callable, this argument is ignored. If None, the provided keys must exist for all jobs (Default value = None).

  • sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the key argument to sorted(). If None, no sorting is performed (Default value = None).

  • sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).

  • select (callable or None) – Condition for filtering individual jobs. This is passed as the function argument to filter(). If None, no filtering is performed (Default value = None).

Returns:

aggregator – The groupby() aggregator.

Return type:

aggregator

classmethod groupsof(num=1, sort_by=None, sort_ascending=True, select=None)[source]

Aggregate jobs into groupings of a given size.

By default, creates aggregates consisting of a single job.

If the number of jobs present in the project is not divisible by the number provided by the user, the last aggregate will be smaller and contain the remaining jobs. For instance, if 10 jobs are present in a project and they are aggregated in groups of 3, then the generated aggregates will have lengths 3, 3, 3, and 1.

Examples

The code block below shows how to aggregate jobs in groups of 2.

@FlowProject.operation(aggregator=aggregator.groupsof(num=2))
def foo(*jobs):
    print(len(jobs))
Parameters:
  • num (int) – The default size of aggregates. The final aggregate contains the remaining jobs and may have fewer than num jobs.

  • sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the key argument to sorted(). If None, no sorting is performed (Default value = None).

  • sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).

  • select (callable or None) – Condition for filtering individual jobs. This is passed as the function argument to filter(). If None, no filtering is performed (Default value = None).

Returns:

aggregator – The groupsof() aggregator.

Return type:

aggregator

flow.get_aggregate_id(aggregate)[source]

Generate aggregate id for an aggregate of jobs.

The aggregate id is a unique hash identifying a tuple of jobs. The aggregate id is sensitive to the order of the jobs in the aggregate. The id of an aggregate containing one job is that job’s id (the hash of its state point).

Parameters:

aggregate (tuple of Job) – Aggregate of signac jobs.

Returns:

The generated aggregate id.

Return type:

str

Compute Environments

Detection of compute environments.

This module provides the ComputeEnvironment class, which can be subclassed to automatically detect specific computational environments.

This enables the user to adjust their workflow based on the present environment, e.g. for the adjustment of scheduler submission scripts.

flow.environment.template_filter(func)[source]

Decorate a function as a ComputeEnvironment template filter.

This decorator is applied to methods defined in a subclass of ComputeEnvironment that are used in that environment’s templates. The decorated function becomes a class method that is available as a jinja2 filter in templates rendered by a FlowProject with that ComputeEnvironment.

Parameters:

func (callable) – Function to decorate.

Returns:

Decorated function.

Return type:

callable

class flow.environment.ComputeEnvironment[source]

Bases: object

Define computational environments.

The ComputeEnvironment class allows us to automatically determine specific environments in order to programmatically adjust workflows in different environments.

The default method for the detection of a specific environment is to provide a regular expression matching the environment’s hostname. For example, if the hostname is my-server.com, one could identify the environment by setting the hostname_pattern to 'my-server'.

classmethod add_args(parser)[source]

Add arguments related to this compute environment to an argument parser.

Parameters:

parser (argparse.ArgumentParser) – The argument parser where arguments will be added.

classmethod get_prefix(operation, parallel=False, mpi_prefix=None, cmd_prefix=None)[source]

Template filter generating a command prefix from directives.

Parameters:
  • operation (flow.project._JobOperation) – The operation to be prefixed.

  • parallel (bool) – If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the maximum number of tasks. Default is set to False.

  • mpi_prefix (str) – User defined mpi_prefix string. Default is set to None. This will be deprecated and removed in the future.

  • cmd_prefix (str) – User defined cmd_prefix string. Default is set to None. This will be deprecated and removed in the future.

Returns:

The prefix to be added to the operation’s command.

Return type:

str

classmethod get_scheduler()[source]

Return an environment-specific scheduler driver.

The returned scheduler class provides a standardized interface to different scheduler implementations.

classmethod is_present()[source]

Determine whether this specific compute environment is present.

The default method for environment detection is trying to match a hostname pattern or delegate the environment check to the associated scheduler type.

classmethod submit(script, flags=None, *args, **kwargs)[source]

Submit a job submission script to the environment’s scheduler.

Scripts should be submitted to the environment, instead of directly to the scheduler to allow for environment specific post-processing.

Parameters:
  • script (str) – The script to submit.

  • flags (list) – A list of additional flags to provide to the scheduler. (Default value = None)

  • *args – Positional arguments forwarded to the scheduler’s submit method.

  • **kwargs – Keyword arguments forwarded to the scheduler’s submit method.

Returns:

Status of job, if submitted.

Return type:

JobStatus.submitted or None

class flow.environment.StandardEnvironment[source]

Bases: ComputeEnvironment

Default environment which is always present.

classmethod is_present()[source]

Determine whether this specific compute environment is present.

The StandardEnvironment is always present, so this returns True.

class flow.environment.TestEnvironment[source]

Bases: ComputeEnvironment

Environment used for testing.

The test environment will print a mocked submission script and submission commands to screen. This enables testing of the job submission script generation in environments without a real scheduler.

scheduler_type

alias of FakeScheduler

class flow.environment.SimpleSchedulerEnvironment[source]

Bases: ComputeEnvironment

An environment for the simple-scheduler scheduler.

scheduler_type

alias of SimpleScheduler

flow.environment.registered_environments()[source]

Return a list of registered environments.

Returns:

List of registered environments.

Return type:

list

Schedulers

Defines the API for the scheduling system.

class flow.scheduling.base.JobStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Classifies the job’s execution status.

Group statuses exist to enable status output for individual operations within a larger group.

active = 7

The cluster job is actively running.

error = 8

The cluster job is in an error or failed state.

group_active = 14

The operation is in a group that is actively running.

group_error = 15

The operation is in a group that is in an error or failed state.

group_held = 12

The operation is in a group that is held.

group_inactive = 10

The operation is in a group that is inactive.

This includes states like completed, cancelled, or timed out.

group_queued = 13

The operation is in a group that is queued.

group_registered = 9

The operation is in a group that is registered with the scheduler.

group_submitted = 11

The operation is in a group that has been submitted.

Note that this state is never returned by a scheduler, but is an assumed state immediately after a group containing the operation is submitted.

held = 5

The cluster job is held.

inactive = 3

The cluster job is inactive.

This includes states like completed, cancelled, or timed out.

placeholder = 127

A placeholder state that is used for status rendering when no operations are eligible.

queued = 6

The cluster job is queued.

registered = 2

The cluster job is registered with the scheduler, but no other status is known.

submitted = 4

The cluster job has been submitted.

Note that this state is never returned by a scheduler, but is an assumed state immediately after a cluster job is submitted.

unknown = 1

Unknown cluster job status.

user = 128

All user-defined states must be >=128 in value.

class flow.scheduling.base.ClusterJob(job_id, status=None)[source]

Bases: object

Class representing a cluster job.

name()[source]

Return the name of the cluster job.

status()[source]

Return the status of the cluster job.

class flow.scheduling.base.Scheduler[source]

Bases: ABC

Abstract base class for schedulers.

abstract classmethod is_present()[source]

Return True if the scheduler is detected.

abstract jobs()[source]

Yield all cluster jobs.

Yields:

ClusterJob – Cluster job.

abstract submit(script, **kwargs)[source]

Submit a job script to the scheduler for execution.

class flow.scheduling.fake_scheduler.FakeScheduler[source]

Bases: Scheduler

Implementation of the abstract Scheduler class for a fake scheduler.

This scheduler does not actually schedule (or execute) any jobs, but it can be used to test the submission workflow.

classmethod is_present()[source]

Return False.

The FakeScheduler is never present unless manually specified.

jobs()[source]

Return None (no jobs are scheduled by the FakeScheduler).

submit(script, **kwargs)[source]

Print the script to screen.

Parameters:
  • script (str) – Script to print.

  • **kwargs – Keyword arguments (ignored).

class flow.scheduling.lsf.LSFScheduler(user=None)[source]

Bases: Scheduler

Implementation of the abstract Scheduler class for LSF schedulers.

This class can submit cluster jobs to a LSF scheduler and query their current status.

Parameters:
  • user (str) – Limit the status information to cluster jobs submitted by user.

  • **kwargs – Forwarded to the parent constructor.

classmethod is_present()[source]

Return True if an LSF scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.

  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)

  • hold (bool) – Whether to hold the job upon submission. (Default value = False)

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)

  • **kwargs – Additional keyword arguments (ignored).

Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

class flow.scheduling.pbs.PBSScheduler(user=None)[source]

Bases: Scheduler

Implementation of the abstract Scheduler class for PBS schedulers.

This class can submit cluster jobs to a PBS scheduler and query their current status.

Parameters:
  • user (str) – Limit the status information to cluster jobs submitted by user.

  • **kwargs – Forwarded to the parent constructor.

classmethod is_present()[source]

Return True if a PBS scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.

  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)

  • hold (bool) – Whether to hold the job upon submission. (Default value = False)

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)

  • **kwargs – Additional keyword arguments (ignored).

Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

class flow.scheduling.simple_scheduler.SimpleScheduler[source]

Bases: Scheduler

Implementation of the abstract Scheduler class for the bundled simple-scheduler.

The package signac-flow includes a script in bin/simple-scheduler that is a simple model of a cluster job scheduler. The simple-scheduler script is designed primarily for testing and demonstration.

This class can submit cluster jobs to the built-in simple scheduler and query their current status.

classmethod is_present()[source]

Return True if a SimpleScheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, pretend=False, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • **kwargs – Keyword arguments (ignored).

Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

class flow.scheduling.slurm.SlurmScheduler(user=None)[source]

Bases: Scheduler

Implementation of the abstract Scheduler class for SLURM schedulers.

This class can submit cluster jobs to a SLURM scheduler and query their current status.

Parameters:
  • user (str) – Limit the status information to cluster jobs submitted by user.

  • **kwargs – Forwarded to the parent constructor.

classmethod is_present()[source]

Return True if a SLURM scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.

  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)

  • hold (bool) – Whether to hold the job upon submission. (Default value = False)

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)

  • **kwargs – Additional keyword arguments (ignored).

Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

Error Classes

Definitions of Exception classes used in this package.

exception flow.errors.ConfigKeyError[source]

Bases: KeyError

Indicates that a config key was not found.

exception flow.errors.DirectivesError[source]

Bases: ValueError

Indicates that a directive was incorrectly set.

exception flow.errors.FlowProjectDefinitionError[source]

Bases: ValueError

Indicates an invalid FlowProject definition.

exception flow.errors.NoSchedulerError[source]

Bases: AttributeError

Indicates that there is no scheduler type defined for an environment class.

exception flow.errors.SubmitError[source]

Bases: RuntimeError

Indicates an error during cluster job submission.

class flow.errors.TemplateError(environment: Environment)[source]

Bases: Extension

Indicates an error in a jinja2 template.

err(msg, caller)[source]

Raise a template error.

parse(parser)[source]

Call err() when a template raises an Exception.

tags: t.Set[str] = {'raise'}

if this extension parses this is the list of tags it’s listening to.

exception flow.errors.UserConditionError[source]

Bases: RuntimeError

Indicates an error during evaluation of a condition.

exception flow.errors.UserOperationError[source]

Bases: RuntimeError

Indicates an error during execution of a BaseFlowOperation.