API Reference

This is the API for the signac-flow application.

Command Line Interface

Some core signac-flow functions are—in addition to the Python interface—accessible directly via the $ flow command.

For more information, please see $ flow --help.

usage: flow [-h] [--debug] [--version] {init} ...

flow provides the basic components to set up workflows for projects as part of
the signac framework.

positional arguments:
  {init}
    init      Initialize a signac-flow project.

optional arguments:
  -h, --help  show this help message and exit
  --debug     Show traceback on error for debugging.
  --version   Display the version number and exit.

The FlowProject

Attributes

FlowProject.ALIASES Default aliases used within the status output.
FlowProject.completed_operations(job) Determine which operations have been completed for job.
FlowProject.get_job_status(job[, …]) Return status information about a job.
FlowProject.label([label_name_or_func]) Designate a function as a label function for this class.
FlowProject.labels(job) Yield all labels for the given job.
FlowProject.main([parser]) Call this function to use the main command line interface.
FlowProject.make_group(name[, options]) Make a FlowGroup named name and return a decorator to make groups.
FlowProject.operation(func[, name]) Add an operation function to the class workflow definition.
FlowProject.operations Get the dictionary of operations that have been added to the workflow.
FlowProject.post(condition[, tag]) Define and evaluate postconditions for operations.
FlowProject.post.copy_from(*other_funcs) Copy postconditions from other operation(s).
FlowProject.post.false(key) Evaluate if a document key is False for the job(s).
FlowProject.post.isfile(filename) Determine if the specified file exists for the job(s).
FlowProject.post.never(func) Return False.
FlowProject.post.not_(condition) Return not condition(*jobs) for the provided condition function.
FlowProject.post.true(key) Evaluate if a document key is True for the job(s).
FlowProject.pre(condition[, tag]) Define and evaluate preconditions for operations.
FlowProject.pre.after(*other_funcs) Precondition to run an operation after other operations.
FlowProject.pre.copy_from(*other_funcs) Copy preconditions from other operation(s).
FlowProject.pre.false(key) Evaluate if a document key is False for the job(s).
FlowProject.pre.isfile(filename) Determine if the specified file exists for the job(s).
FlowProject.pre.never(func) Return False.
FlowProject.pre.not_(condition) Return not condition(*jobs) for the provided condition function.
FlowProject.pre.true(key) Evaluate if a document key is True for the job(s).
FlowProject.run([jobs, names, pretend, np, …]) Execute all eligible operations for the given selection.
FlowProject.scheduler_jobs(scheduler) Fetch jobs from the scheduler.
FlowProject.submit([bundle_size, jobs, …]) Submit function for the project’s main submit interface.
class flow.FlowProject(config=None, environment=None, entrypoint=None)[source]

Bases: signac.contrib.project.Project

A signac project class specialized for workflow management.

This class is used to define, execute, and submit workflows based on operations and conditions.

Users typically interact with this class through its command line interface.

This is a typical example of how to use this class:

@FlowProject.operation
def hello(job):
    print('hello', job)

FlowProject().main()
Parameters:
  • config (signac.contrib.project._ProjectConfig) – A signac configuration, defaults to the configuration loaded from the current directory.
  • environment (flow.environment.ComputeEnvironment) – An environment to use for scheduler submission. If None, the environment is automatically identified. The default is None.
  • entrypoint (dict) – A dictionary with two possible keys: 'executable' and 'path'. The path represents the location of the script file (the script file must call FlowProject.main()). The executable represents the location of the Python interpreter used for the execution of BaseFlowOperation that are Python functions.
ALIASES = {'active': 'A', 'error': 'E', 'held': 'H', 'inactive': 'I', 'queued': 'Q', 'registered': 'R', 'submitted': 'S', 'unknown': 'U'}

Default aliases used within the status output.

PRINT_STATUS_ALL_VARYING_PARAMETERS = True

This constant can be used to signal that the print_status() method is supposed to automatically show all varying parameters.

completed_operations(job)[source]

Determine which operations have been completed for job.

Parameters:job (Job) – The signac job handle.
Yields:str – The names of the operations that are complete.
detect_operation_graph()[source]

Determine the directed acyclic graph given by operation conditions.

In general, executing a given operation registered with a FlowProject just involves checking the operation’s preconditions and postconditions to determine eligibility. More generally, however, the preconditions and postconditions define a directed acyclic graph that governs the execution of all operations. Visualizing this graph can be useful for finding logic errors in the specified conditions, and having this graph computed also enables additional execution modes. For example, using this graph it is possible to determine exactly what operations need to be executed in order to make the operation eligible so that the task of executing all necessary operations can be automated.

The graph is determined by iterating over all pairs of operations and checking for equality of preconditions and postconditions. The algorithm builds an adjacency matrix based on whether the preconditions for one operation match the postconditions for another. The comparison of operations is conservative; by default, conditions must be composed of identical code to be identified as equal (technically, they must have equivalent bytecode, i.e. cond1.__code__.co_code == cond2.__code__.co_code). Users can specify that conditions should be treated as equal by providing tags to the operations.

Given a FlowProject subclass defined in a module project.py, the output graph could be visualized using Matplotlib and NetworkX with the following code:

import numpy as np
import networkx as nx
from matplotlib import pyplot as plt

from project import Project

project = Project()
ops = project.operations.keys()
adj = np.asarray(project.detect_operation_graph())

plt.figure()
g = nx.DiGraph(adj)
pos = nx.spring_layout(g)
nx.draw(g, pos)
nx.draw_networkx_labels(
    g, pos,
    labels={key: name for (key, name) in
            zip(range(len(ops)), [o for o in ops])})

plt.show()
Returns:The adjacency matrix of operation dependencies. A zero indicates no dependency, and a 1 indicates dependency. This can be converted to a graph using NetworkX.
Return type:list of lists of int
Raises:RuntimeError – If a condition does not have a tag. This can occur when using functools.partial, and a manually specified condition tag has not been set.
get_job_status(job, ignore_errors=False, cached_status=None)[source]

Return status information about a job.

Parameters:
  • job (Job) – The signac job.
  • ignore_errors (bool) – Whether to ignore exceptions raised during status check. (Default value = False)
  • cached_status (dict) – Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of JobStatus. (Default value = None)
Returns:

A dictionary containing job status for all jobs.

Return type:

dict

groups

Get the dictionary of groups that have been added to the workflow.

classmethod label(label_name_or_func=None)[source]

Designate a function as a label function for this class.

For example, we can define a label function like this:

@FlowProject.label
def foo_label(job):
    if job.document.get('foo', False):
        return 'foo-label-text'

The foo-label-text label will now show up in the status view for each job, where the foo key evaluates true.

If the label functions returns any type other than str, the label name will be the name of the function if and only if the return value evaluates to True, for example:

@FlowProject.label
def foo_label(job):
    return job.document.get('foo', False)

Finally, specify a label name by providing it as the first argument to the label() decorator.

Parameters:label_name_or_func (str or callable) – A label name or callable. (Default value = None)
Returns:A decorator for the label function.
Return type:callable
labels(job)[source]

Yield all labels for the given job.

See also: label().

Parameters:job (signac.contrib.job.Job) – Job handle.
Yields:str – Label value.
main(parser=None)[source]

Call this function to use the main command line interface.

In most cases one would want to call this function as part of the class definition:

# my_project.py
from flow import FlowProject

class MyProject(FlowProject):
    pass

if __name__ == '__main__':
    MyProject().main()

The project can then be executed on the command line:

$ python my_project.py --help
Parameters:parser (argparse.ArgumentParser) – The argument parser used to implement the command line interface. If None, a new parser is constructed. (Default value = None)
classmethod make_group(name, options='')[source]

Make a FlowGroup named name and return a decorator to make groups.

A FlowGroup is used to group operations together for running and submitting _JobOperations.

Examples

The code below creates a group and adds an operation to that group.

example_group = FlowProject.make_group('example')

@example_group
@FlowProject.operation
def foo(job):
    return "hello world"
Parameters:
  • name (str) – The name of the FlowGroup.
  • options (str) – A string to append to submissions. Can be any valid FlowOperation.run() option. (Default value = “”)
Returns:

The created group.

Return type:

FlowGroupEntry

classmethod operation(func, name=None)[source]

Add an operation function to the class workflow definition.

This function is designed to be used as a decorator, for example:

@FlowProject.operation
def hello(job):
    print('Hello', job)
Parameters:
  • func (callable) – The function to add to the workflow.
  • name (str) –
    The operation name. Uses the name of the function if None.
    (Default value = None)
Returns:

The operation function.

Return type:

callable

operations

Get the dictionary of operations that have been added to the workflow.

print_status(jobs=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, param_max_width=None, expand=False, all_ops=False, only_incomplete=False, dump_json=False, unroll=True, compact=False, pretty=False, file=None, err=None, ignore_errors=False, template=None, profile=False, eligible_jobs_max_lines=None, output_format='terminal')[source]

Print the status of the project.

Parameters:
  • jobs (iterable of Job) – Only print status for the given jobs, or all if the argument is None. (Default value = None)
  • overview (bool) – Display an overview of the project status. (Default value = True)
  • overview_max_lines (int) – Limit the number of overview lines. (Default value = None)
  • detailed (bool) – Print a detailed status of each job. (Default value = False)
  • parameters (list of str) – Print the value of the specified parameters. (Default value = None)
  • param_max_width (int) – Limit the number of characters of parameter columns. (Default value = None)
  • expand (bool) – Present labels and operations in two separate tables. (Default value = False)
  • all_ops (bool) – Include operations that are not eligible to run. (Default value = False)
  • only_incomplete (bool) – Only show jobs that have eligible operations. (Default value = False)
  • dump_json (bool) – Output the data as JSON instead of printing the formatted output. (Default value = False)
  • unroll (bool) – Separate columns for jobs and the corresponding operations. (Default value = True)
  • compact (bool) – Print a compact version of the output. (Default value = False)
  • pretty (bool) – Prettify the output. (Default value = False)
  • file (str) – Redirect all output to this file, defaults to sys.stdout.
  • err (str) – Redirect all error output to this file, defaults to sys.stderr.
  • ignore_errors (bool) – Print status even if querying the scheduler fails. (Default value = False)
  • template (str) – User provided Jinja2 template file. (Default value = None)
  • profile (bool) – Show profile result. (Default value = False)
  • eligible_jobs_max_lines (int) – Limit the number of operations and its eligible job count printed in the overview. (Default value = None)
  • output_format (str) – Status output format, supports: ‘terminal’ (default), ‘markdown’ or ‘html’.
run(jobs=None, names=None, pretend=False, np=None, timeout=None, num=None, num_passes=1, progress=False, order=None, ignore_conditions=<IgnoreConditions.NONE: 0>)[source]

Execute all eligible operations for the given selection.

This function will run in an infinite loop until all eligible operations are executed, unless it reaches the maximum number of passes per operation or the maximum number of executions.

By default there is no limit on the total number of executions, but a specific operation will only be executed once per job. This is to avoid accidental infinite loops when no or faulty postconditions are provided.

Parameters:
  • jobs (iterable of Job) – Only execute operations for the given jobs, or all if the argument is None. (Default value = None)
  • names (iterable of str) – Only execute operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)
  • pretend (bool) – Do not actually execute the operations, but show the commands that would have been executed. (Default value = False)
  • np (int) – Parallelize to the specified number of processors. Use -1 to parallelize to all available processing units. (Default value = None)
  • timeout (int) – An optional timeout for each operation in seconds after which execution will be cancelled. Use -1 to indicate no timeout (the default).
  • num (int) – The total number of operations that are executed will not exceed this argument if provided. (Default value = None)
  • num_passes (int or None) – The total number of executions of one specific job-operation pair will not exceed this argument. The default is 1, there is no limit if this argument is None.
  • progress (bool) – Show a progress bar during execution. (Default value = False)
  • order (str, callable, or None) –

    Specify the order of operations. Possible values are:

    • ’none’ or None (no specific order)
    • ’by-job’ (operations are grouped by job)
    • ’cyclic’ (order operations cyclic by job)
    • ’random’ (shuffle the execution order randomly)
    • callable (a callable returning a comparison key for an operation used to sort operations)

    The default value is 'none', which is equivalent to 'by-job' in the current implementation.

    Note

    Users are advised to not rely on a specific execution order as a substitute for defining the workflow in terms of preconditions and postconditions. However, a specific execution order may be more performant in cases where operations need to access and potentially lock shared resources.

  • ignore_conditions (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is IgnoreConditions.NONE.
scheduler_jobs(scheduler)[source]

Fetch jobs from the scheduler.

This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically.

However, this function will not automatically filter scheduler jobs which are not associated with this project.

Parameters:scheduler (Scheduler) – The scheduler instance.
Yields:ClusterJob – All cluster jobs fetched from the scheduler.
submit(bundle_size=1, jobs=None, names=None, num=None, parallel=False, force=False, ignore_conditions=<IgnoreConditions.NONE: 0>, ignore_conditions_on_execution=<IgnoreConditions.NONE: 0>, **kwargs)[source]

Submit function for the project’s main submit interface.

Parameters:
  • bundle_size (int) – Specify the number of operations to be bundled into one submission, defaults to 1.
  • jobs (iterable of Job) – Only submit operations for the given jobs, or all if the argument is None. (Default value = None)
  • names (iterable of str) – Only submit operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)
  • num (int) – Limit the total number of submitted operations, defaults to no limit.
  • parallel (bool) – Execute all bundled operations in parallel. (Default value = False)
  • force (bool) – Ignore all warnings or checks during submission, just submit. (Default value = False)
  • ignore_conditions (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is IgnoreConditions.NONE.
  • ignore_conditions_on_execution (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility after submitting. The default is IgnoreConditions.NONE.
  • **kwargs – Additional keyword arguments forwarded to submit().
FlowProject.post(tag=None)

Define and evaluate postconditions for operations.

A postcondition is a function accepting one or more jobs as positional arguments (*jobs) that must evaluate to True for this operation to be considered complete. For example:

@Project.operation
@Project.post(lambda job: job.doc.get('bye'))
def bye(job):
    print('bye', job)
    job.doc.bye = True

The bye operation would be considered complete and therefore no longer eligible for execution once the ‘bye’ key in the job document evaluates to True.

An optional tag may be associated with the condition. These tags are used by detect_operation_graph() when comparing conditions for equality. The tag defaults to the bytecode of the function.

classmethod post.copy_from(*other_funcs)

Copy postconditions from other operation(s).

True if and only if all postconditions of other operation function(s) are met.

classmethod post.false(key)

Evaluate if a document key is False for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to False.

classmethod post.isfile(filename)

Determine if the specified file exists for the job(s).

classmethod post.never(func)

Return False.

classmethod post.not_(condition)

Return not condition(*jobs) for the provided condition function.

classmethod post.true(key)

Evaluate if a document key is True for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to True.

FlowProject.pre(tag=None)

Define and evaluate preconditions for operations.

A precondition is a function accepting one or more jobs as positional arguments (*jobs) that must evaluate to True for this operation to be eligible for execution. For example:

@Project.operation
@Project.pre(lambda job: not job.doc.get('hello'))
def hello(job):
    print('hello', job)
    job.doc.hello = True

The hello operation would only execute if the ‘hello’ key in the job document does not evaluate to True.

An optional tag may be associated with the condition. These tags are used by detect_operation_graph() when comparing conditions for equality. The tag defaults to the bytecode of the function.

classmethod pre.after(*other_funcs)

Precondition to run an operation after other operations.

True if and only if all postconditions of other operation function(s) are met.

classmethod pre.copy_from(*other_funcs)

Copy preconditions from other operation(s).

True if and only if all preconditions of other operation function(s) are met.

classmethod pre.false(key)

Evaluate if a document key is False for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to False.

classmethod pre.isfile(filename)

Determine if the specified file exists for the job(s).

classmethod pre.never(func)

Return False.

classmethod pre.not_(condition)

Return not condition(*jobs) for the provided condition function.

classmethod pre.true(key)

Evaluate if a document key is True for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to True.

class flow.IgnoreConditions[source]

Bases: enum.IntFlag

Flags that determine which conditions are used to determine job eligibility.

ALL = 3

Ignore all conditions.

NONE = 0

Check all conditions.

POST = 2

Ignore postconditions.

PRE = 1

Ignore preconditions.

Operations and Status

class flow.project.BaseFlowOperation(pre=None, post=None)[source]

Bases: object

A BaseFlowOperation represents a data space operation acting on any job.

Every BaseFlowOperation is associated with a specific command.

Preconditions (pre) and postconditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False.

An operation is considered “eligible” for execution when all preconditions are met and when at least one of the postconditions is not met. Preconditions are always met when the list of preconditions is empty. Postconditions are never met when the list of postconditions is empty.

Note

This class should not be instantiated directly.

Parameters:
  • pre (sequence of callables) – List of preconditions.
  • post (sequence of callables) – List of postconditions.
class flow.project.FlowOperation(op_func, pre=None, post=None)[source]

Bases: flow.project.BaseFlowOperation

An operation that executes a Python function.

All operations without the @cmd directive use this class. The callable op_func should be a function of one or more instances of Job.

Note

This class should not be instantiated directly.

Parameters:
  • op_func (callable) – A callable function of *jobs.
  • pre (sequence of callables) – List of preconditions.
  • post (sequence of callables) – List of postconditions.
__call__(*jobs)[source]

Call the operation on the provided jobs.

Parameters:*jobs (One or more instances of Job.) – The jobs passed to the operation.
Returns:The result of the operation function.
Return type:object
class flow.project.FlowCmdOperation(cmd, pre=None, post=None)[source]

Bases: flow.project.BaseFlowOperation

An operation that executes a shell command.

When an operation has the @cmd directive specified, it is instantiated as a FlowCmdOperation. The operation should be a function of one or more positional arguments that are instances of Job. The command (cmd) may either be a callable that expects one or more instances of Job as positional arguments and returns a string containing valid shell commands, or the string of commands itself. In either case, the resulting string may contain any attributes of the job (or jobs) placed in curly braces, which will then be substituted by Python string formatting.

Note

This class should not be instantiated directly.

Parameters:
  • cmd (str or callable) – The command to execute the operation. Callable values will be provided one or more positional arguments (*jobs) that are instances of Job. String values will be formatted with cmd.format(jobs=jobs) where jobs is a tuple of Job, or cmd.format(jobs=jobs, job=job) if only one job is provided.
  • pre (sequence of callables) – List of preconditions.
  • post (sequence of callables) – List of postconditions.
__call__(*jobs)[source]

Return the command formatted with the supplied job(s).

Labels

class flow.label(name=None)[source]

Bases: object

Decorate a FlowProject class function as a label function.

For example:

class MyProject(FlowProject):

    @label()
    def foo(self, job):
        return True
__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters:func (callable) – The function to decorate.
Returns:The decorated function.
Return type:callable
class flow.staticlabel(name=None)[source]

Bases: flow.labels.label

A label decorator for staticmethods.

This decorator implies “staticmethod”!

__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters:func (callable) – The function to decorate.
Returns:The decorated function.
Return type:callable
class flow.classlabel(name=None)[source]

Bases: flow.labels.label

A label decorator for classmethods.

This decorator implies “classmethod”!

__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters:func (callable) – The function to decorate.
Returns:The decorated function.
Return type:callable

@flow.cmd

flow.cmd(func)[source]

Indicate that func returns a shell command with this decorator.

If this function is an operation function defined by FlowProject, it will be interpreted to return a shell command, instead of executing the function itself.

For example:

@FlowProject.operation
@flow.cmd
def hello(job):
    return "echo {job.id}"

Note

The final shell command generated for run() or submit() still respects directives and will prepend e.g. MPI or OpenMP prefixes to the shell command provided here.

@flow.with_job

flow.with_job(func)[source]

Use arg as a context manager for func(arg) with this decorator.

If this function is an operation function defined by FlowProject, it will be the same as using with job:.

For example:

@FlowProject.operation
@flow.with_job
def hello(job):
    print("hello {}".format(job))

Is equivalent to:

@FlowProject.operation
def hello(job):
    with job:
        print("hello {}".format(job))

This also works with the @cmd decorator:

@FlowProject.operation
@with_job
@cmd
def hello(job):
    return "echo 'hello {}'".format(job)

Is equivalent to:

@FlowProject.operation
@cmd
def hello_cmd(job):
    return 'trap "cd `pwd`" EXIT && cd {} && echo "hello {job}"'.format(job.ws)

@flow.directives

class flow.directives(**kwargs)[source]

Bases: object

Decorator for operation functions to provide additional execution directives.

Directives can for example be used to provide information about required resources such as the number of processes required for execution of parallelized operations. For more information, read about Submission Directives.

Supported Directives:

fork

The fork directive can be set to True to enforce that a particular operation is always executed within a subprocess and not within the Python interpreter’s process even if there are no other reasons that would prevent that.

Note

Setting fork=False will not prevent forking if there are other reasons for forking, such as a timeout.

np

The number of tasks to launch for a given operation i.e., the number of CPU cores to be requested for a given operation.

Expects a natural number (i.e. an integer >= 1). This directive introspects into the “nranks” or “omp_num_threads” directives and uses their product if it is greater than the current set value. Defaults to 1.

ngpu

The number of GPUs to use for this operation.

Expects a nonnegative integer. Defaults to 0.

nranks

The number of MPI ranks to use for this operation. Defaults to 0.

Expects a nonnegative integer.

omp_num_threads

The number of OpenMP threads to use for this operation. Defaults to 0.

Expects a nonnegative integer.

executable

Return the path to the executable to be used for an operation.

The executable directive expects a string pointing to a valid executable file in the current file system.

When called, by default this should point to a Python executable (interpreter); however, if the FlowProject path is an empty string, the executable can be a path to an executable Python script. Defaults to sys.executable.

walltime

The number of hours to request for executing this job.

This directive expects a float representing the walltime in hours. Fractional values are supported. For example, a value of 0.5 will request 30 minutes of walltime. If no walltimes are requested, the submission will not specify a walltime in the output script. Some schedulers have a default value that will be used.

For example:

@Project.operation
@directives(walltime=24)
def op(job):
    # This operation takes 1 day to run
    pass

processor_fraction

Fraction of a resource to use on a single operation.

If set to 0.5 for a bundled job with 20 operations (all with ‘np’ set to 1), 10 CPUs will be used. Defaults to 1.

Note

This can be particularly useful on Stampede2’s launcher.

memory

The memory to request for this operation.

The memory to validate should be either a float, int, or string. A valid memory argument is defined as:

  • Positive numeric value with suffix “g” or “G” indicating memory requested in gigabytes.

For example:

@Project.operation
@directives(memory="4g")
def op(job):
    pass
  • Positive numeric value with suffix “m” or “M” indicating memory requested in megabytes.

For example:

@Project.operation
@directives(memory="512m")
def op(job):
    pass
  • Positive numeric value with no suffix indicating memory requested in gigabytes.

For example:

@Project.operation
@directives(memory="4")
def op1(job):
    pass

@Project.operation
@directives(memory=4)
def op2(job):
    pass
__call__(func)[source]

Add directives to the function.

This call operator allows the class to be used as a decorator.

Parameters:func (callable) – The function to decorate.
Returns:The decorated function.
Return type:callable
classmethod copy_from(func)[source]

Copy directives from another operation.

flow.init()

flow.init(alias=None, template=None, root=None)[source]

Initialize a templated FlowProject module.

Parameters:
  • alias (str) – Python identifier used as a file name for the template output. Uses "project" if None. (Default value = None)
  • template (str) –

    Name of the template to use. Built-in templates are:

    • "minimal"
    • "example"
    • "testing"

    Uses "minimal" if None. (Default value = None)

  • root (str) – Directory where the output file is placed. Uses the current working directory if None. (Default value = None)
Returns:

List of file names created.

Return type:

list

flow.get_environment()

flow.get_environment(test=False, import_configured=True)[source]

Attempt to detect the present environment.

This function iterates through all defined ComputeEnvironment classes in reversed order of definition and returns the first environment where the is_present() method returns True.

Parameters:
  • test (bool) – Whether to return the TestEnvironment. (Default value = False)
  • import_configured (bool) – Whether to import environments specified in the flow configuration. (Default value = True)
Returns:

The detected environment class.

Return type:

ComputeEnvironment

The FlowGroup

class flow.project.FlowGroup(name, operations=None, operation_directives=None, options='')[source]

Bases: object

A FlowGroup represents a subset of a workflow for a project.

A FlowGroup is associated with one or more instances of BaseFlowOperation.

Examples

In the example below, the directives will be {'nranks': 4} for op1 and {'nranks': 2, 'executable': 'python3'} for op2.

group = FlowProject.make_group(name='example_group')

@group.with_directives(nranks=4)
@FlowProject.operation
@directives(nranks=2, executable="python3")
def op1(job):
    pass

@group
@FlowProject.operation
@directives(nranks=2, executable="python3")
def op2(job):
    pass
Parameters:
  • name (str) – The name of the group to be used when calling from the command line.
  • operations (dict) – A dictionary of operations where the keys are operation names and each value is a BaseFlowOperation.
  • operation_directives (dict) – A dictionary of additional parameters that provide instructions on how to execute a particular operation, e.g., specifically required resources. Operation names are keys and the dictionaries of directives are values. If an operation does not have directives specified, then the directives of the singleton group containing that operation are used. To prevent this, set the directives to an empty dictionary for that operation.
  • options (str) – A string of options to append to the output of the object’s call method. This allows options like --num_passes to be given to a group.
add_operation(name, operation, directives=None)[source]

Add an operation to the FlowGroup.

Parameters:
  • name (str) – The name of the operation.
  • operation (BaseFlowOperation) – The workflow operation to add to the FlowGroup.
  • directives (dict) – The operation specific directives. (Default value = None)
isdisjoint(group)[source]

Return whether two groups are disjoint.

Groups are disjoint if they do not share any common operations.

Parameters:group (FlowGroup) – The other FlowGroup to compare to.
Returns:Returns True if group and self share no operations, otherwise returns False.
Return type:bool
class flow.project.FlowGroupEntry(name, options='')[source]

Bases: object

A FlowGroupEntry registers operations for inclusion into a FlowGroup.

Application developers should not directly instantiate this class, but use make_group() instead.

Operation functions can be marked for inclusion into a FlowGroup by decorating the functions with a corresponding FlowGroupEntry. If the operation requires specific directives, with_directives() accepts keyword arguments that are mapped to directives and returns a decorator that can be applied to the operation to mark it for inclusion in the group and indicate that it should be executed using the specified directives. This overrides the default directives specified by flow.directives().

Parameters:
  • name (str) – The name of the FlowGroup to be created.
  • options (str) – The FlowProject.run() options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute.
__call__(func)[source]

Add the function into the group’s operations.

This call operator allows the class to be used as a decorator.

Parameters:func (callable) – The function to decorate.
Returns:The decorated function.
Return type:callable
with_directives(directives)[source]

Return a decorator that sets group specific directives to the operation.

Parameters:directives (dict) – Directives to use for resource requests and running the operation through the group.
Returns:A decorator which registers the function into the group with specified directives.
Return type:function

Compute Environments

Detection of compute environments.

This module provides the ComputeEnvironment class, which can be subclassed to automatically detect specific computational environments.

This enables the user to adjust their workflow based on the present environment, e.g. for the adjustment of scheduler submission scripts.

flow.environment.setup(py_modules, **attrs)[source]

Set up user-defined environment modules.

Use this function in place of setuptools.setup() to not only install an environment’s module, but also register it with the global signac configuration. Once registered, the environment is automatically imported when the get_environment() function is called.

flow.environment.template_filter(func)[source]

Decorate a function as a ComputeEnvironment template filter.

This decorator is applied to methods defined in a subclass of ComputeEnvironment that are used in that environment’s templates. The decorated function becomes a class method that is available as a jinja2 filter in templates rendered by a FlowProject with that ComputeEnvironment.

Parameters:func (callable) – Function to decorate.
Returns:Decorated function.
Return type:callable
class flow.environment.ComputeEnvironment[source]

Bases: object

Define computational environments.

The ComputeEnvironment class allows us to automatically determine specific environments in order to programmatically adjust workflows in different environments.

The default method for the detection of a specific environment is to provide a regular expression matching the environment’s hostname. For example, if the hostname is my-server.com, one could identify the environment by setting the hostname_pattern to 'my-server'.

classmethod add_args(parser)[source]

Add arguments related to this compute environment to an argument parser.

Parameters:parser (argparse.ArgumentParser) – The argument parser where arguments will be added.
classmethod get_config_value(key, default=<flow.util.config._GetConfigValueNoneType object>)[source]

Request a value from the user’s configuration.

This method should be used whenever values need to be provided that are specific to a user’s environment, e.g. account names.

When a key is not configured and no default value is provided, a SubmitError will be raised and the user will be prompted to add the missing key to their configuration.

Please note, that the key will be automatically expanded to be specific to this environment definition. For example, a key should be 'account', not 'MyEnvironment.account'.

Parameters:
  • key (str) – The environment specific configuration key.
  • default (str) – A default value in case the key cannot be found within the user’s configuration.
Returns:

The value or default value.

Return type:

object

Raises:

SubmitError – If the key is not in the user’s configuration and no default value is provided.

classmethod get_prefix(operation, parallel=False, mpi_prefix=None, cmd_prefix=None)[source]

Template filter generating a command prefix from directives.

Parameters:
  • operation (flow.project._JobOperation) – The operation to be prefixed.
  • parallel (bool) – If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the maximum number of tasks. Default is set to False.
  • mpi_prefix (str) – User defined mpi_prefix string. Default is set to None. This will be deprecated and removed in the future.
  • cmd_prefix (str) – User defined cmd_prefix string. Default is set to None. This will be deprecated and removed in the future.
Returns:

The prefix to be added to the operation’s command.

Return type:

str

classmethod get_scheduler()[source]

Return an environment-specific scheduler driver.

The returned scheduler class provides a standardized interface to different scheduler implementations.

classmethod is_present()[source]

Determine whether this specific compute environment is present.

The default method for environment detection is trying to match a hostname pattern or delegate the environment check to the associated scheduler type.

classmethod submit(script, flags=None, *args, **kwargs)[source]

Submit a job submission script to the environment’s scheduler.

Scripts should be submitted to the environment, instead of directly to the scheduler to allow for environment specific post-processing.

Parameters:
  • script (str) – The script to submit.
  • flags (list) – A list of additional flags to provide to the scheduler. (Default value = None)
  • *args – Positional arguments forwarded to the scheduler’s submit method.
  • **kwargs – Keyword arguments forwarded to the scheduler’s submit method.
Returns:

Status of job, if submitted.

Return type:

JobStatus.submitted or None

class flow.environment.StandardEnvironment[source]

Bases: flow.environment.ComputeEnvironment

Default environment which is always present.

classmethod is_present()[source]

Determine whether this specific compute environment is present.

The StandardEnvironment is always present, so this returns True.

class flow.environment.NodesEnvironment[source]

A compute environment consisting of multiple compute nodes.

Each compute node is assumed to have a specific number of compute units, e.g., CPUs.

class flow.environment.TestEnvironment[source]

Bases: flow.environment.ComputeEnvironment

Environment used for testing.

The test environment will print a mocked submission script and submission commands to screen. This enables testing of the job submission script generation in environments without a real scheduler.

scheduler_type

alias of flow.scheduling.fake_scheduler.FakeScheduler

class flow.environment.SimpleSchedulerEnvironment[source]

Bases: flow.environment.ComputeEnvironment

An environment for the simple-scheduler scheduler.

scheduler_type

alias of flow.scheduling.simple_scheduler.SimpleScheduler

class flow.environment.PBSEnvironment[source]

An environment with PBS scheduler.

class flow.environment.SlurmEnvironment[source]

An environment with SLURM scheduler.

class flow.environment.LSFEnvironment[source]

An environment with LSF scheduler.

flow.environment.registered_environments(import_configured=True)[source]

Return a list of registered environments.

Parameters:import_configured (bool) – Whether to import environments specified in the flow configuration. (Default value = True)
Returns:List of registered environments.
Return type:list

Schedulers

Defines the API for the scheduling system.

class flow.scheduling.base.JobStatus[source]

Bases: enum.IntEnum

Classifies the job’s execution status.

active = 7

The cluster job is actively running.

error = 8

The cluster job is in an error or failed state.

held = 5

The cluster job is held.

inactive = 3

The cluster job is inactive.

This includes states like completed, cancelled, or timed out.

placeholder = 127

A placeholder state that is used for status rendering when no operations are eligible.

queued = 6

The cluster job is queued.

registered = 2

The cluster job is registered with the scheduler, but no other status is known.

submitted = 4

The cluster job has been submitted.

Note that this state is never returned by a scheduler, but is an assumed state immediately after a cluster job is submitted.

unknown = 1

Unknown cluster job status.

user = 128

All user-defined states must be >=128 in value.

class flow.scheduling.base.ClusterJob(job_id, status=None)[source]

Bases: object

Class representing a cluster job.

name()[source]

Return the name of the cluster job.

status()[source]

Return the status of the cluster job.

class flow.scheduling.base.Scheduler[source]

Bases: abc.ABC

Abstract base class for schedulers.

classmethod is_present()[source]

Return True if the scheduler is detected.

jobs()[source]

Yield all cluster jobs.

Yields:ClusterJob – Cluster job.
submit(script, **kwargs)[source]

Submit a job script to the scheduler for execution.

class flow.scheduling.fake_scheduler.FakeScheduler[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for a fake scheduler.

This scheduler does not actually schedule (or execute) any jobs, but it can be used to test the submission workflow.

classmethod is_present()[source]

Return False.

The FakeScheduler is never present unless manually specified.

jobs()[source]

Return None (no jobs are scheduled by the FakeScheduler).

submit(script, **kwargs)[source]

Print the script to screen.

Parameters:
  • script (str) – Script to print.
  • **kwargs – Keyword arguments (ignored).
class flow.scheduling.lsf.LSFScheduler(user=None)[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for LSF schedulers.

This class can submit cluster jobs to a LSF scheduler and query their current status.

Parameters:
  • user (str) – Limit the status information to cluster jobs submitted by user.
  • **kwargs – Forwarded to the parent constructor.
classmethod is_present()[source]

Return True if an LSF scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.
  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
  • hold (bool) – Whether to hold the job upon submission. (Default value = False)
  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
  • **kwargs – Additional keyword arguments (ignored).
Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

class flow.scheduling.pbs.PBSScheduler(user=None)[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for PBS schedulers.

This class can submit cluster jobs to a PBS scheduler and query their current status.

Parameters:
  • user (str) – Limit the status information to cluster jobs submitted by user.
  • **kwargs – Forwarded to the parent constructor.
classmethod is_present()[source]

Return True if a PBS scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.
  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
  • hold (bool) – Whether to hold the job upon submission. (Default value = False)
  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
  • **kwargs – Additional keyword arguments (ignored).
Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

class flow.scheduling.simple_scheduler.SimpleScheduler[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for the bundled simple-scheduler.

The package signac-flow includes a script in bin/simple-scheduler that is a simple model of a cluster job scheduler. The simple-scheduler script is designed primarily for testing and demonstration.

This class can submit cluster jobs to the built-in simple scheduler and query their current status.

classmethod is_present()[source]

Return True if a SimpleScheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, pretend=False, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.
  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
  • **kwargs – Keyword arguments (ignored).
Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

class flow.scheduling.slurm.SlurmScheduler(user=None)[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for SLURM schedulers.

This class can submit cluster jobs to a SLURM scheduler and query their current status.

Parameters:
  • user (str) – Limit the status information to cluster jobs submitted by user.
  • **kwargs – Forwarded to the parent constructor.
classmethod is_present()[source]

Return True if a SLURM scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters:
  • script (str) – The job script submitted for execution.
  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
  • hold (bool) – Whether to hold the job upon submission. (Default value = False)
  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
  • **kwargs – Additional keyword arguments (ignored).
Returns:

True if the submission command succeeds (or in pretend mode).

Return type:

bool

Raises:

SubmitError – If the submission command fails.

Error Classes

Definitions of Exception classes used in this package.

exception flow.errors.ConfigKeyError[source]

Bases: KeyError

Indicates that a config key was not found.

exception flow.errors.DirectivesError[source]

Bases: ValueError

Indicates that a directive was incorrectly set.

exception flow.errors.NoSchedulerError[source]

Bases: AttributeError

Indicates that there is no scheduler type defined for an environment class.

exception flow.errors.SubmitError[source]

Bases: RuntimeError

Indicates an error during cluster job submission.

class flow.errors.TemplateError(environment)[source]

Bases: jinja2.ext.Extension

Indicates an error in a jinja2 template.

err(msg, caller)[source]

Raise a template error.

parse(parser)[source]

Call err() when a template raises an Exception.

exception flow.errors.UserConditionError[source]

Bases: RuntimeError

Indicates an error during evaluation of a condition.

exception flow.errors.UserOperationError[source]

Bases: RuntimeError

Indicates an error during execution of a BaseFlowOperation.