API Reference

This is the API for the signac-flow application.

The FlowProject

Attributes

FlowProject.ALIASES

Default aliases used within the status output.

FlowProject.completed_operations(job)

Determine which operations have been completed for job.

FlowProject.get_job_status(job[, ...])

Return status information about a job.

FlowProject.label([label_name_or_func])

Designate a function as a label function for this class.

FlowProject.labels(job)

Yield all labels for the given job.

FlowProject.main([parser])

Call this function to use the main command line interface.

FlowProject.make_group(name[, options, ...])

Make a FlowGroup named name and return a decorator to make groups.

FlowProject.operation

FlowProject.operation.with_directives(directives)

Decorate a function to make it an operation with additional execution directives.

FlowProject.operation_hooks

alias of flow.project._FlowProjectClass._setup_hooks_object.<locals>._HooksRegister

FlowProject.operation_hooks.on_fail(hook_func)

Add a hook function triggered after the operation exits with an error.

FlowProject.operation_hooks.on_finish(hook_func)

Add a hook function triggered after the operation exits.

FlowProject.operation_hooks.on_start(hook_func)

Add a hook function triggered before an operation starts.

FlowProject.operation_hooks.on_success(hook_func)

Add a hook function triggered after the operation exits without error.

FlowProject.operations

Get the dictionary of operations that have been added to the workflow.

FlowProject.post(condition[, tag])

Define and evaluate postconditions for operations.

FlowProject.post.copy_from(*other_funcs)

Copy postconditions from other operation(s).

FlowProject.post.false(key)

Evaluate if a document key is False for the job(s).

FlowProject.post.isfile(filename)

Determine if the specified file exists for the job(s).

FlowProject.post.never(func)

Return False.

FlowProject.post.not_(condition)

Return not condition(*jobs) for the provided condition function.

FlowProject.post.true(key)

Evaluate if a document key is True for the job(s).

FlowProject.pre(condition[, tag])

Define and evaluate preconditions for operations.

FlowProject.pre.after(*other_funcs)

Precondition to run an operation after other operations.

FlowProject.pre.copy_from(*other_funcs)

Copy preconditions from other operation(s).

FlowProject.pre.false(key)

Evaluate if a document key is False for the job(s).

FlowProject.pre.isfile(filename)

Determine if the specified file exists for the job(s).

FlowProject.pre.never(func)

Return False.

FlowProject.pre.not_(condition)

Return not condition(*jobs) for the provided condition function.

FlowProject.pre.true(key)

Evaluate if a document key is True for the job(s).

FlowProject.print_status([jobs, overview, ...])

Print the status of the project.

FlowProject.project_hooks

hooks.Hooks defined for all project operations.

FlowProject.run([jobs, names, pretend, np, ...])

Execute all eligible operations for the given selection.

FlowProject.scheduler_jobs(scheduler)

Fetch jobs from the scheduler.

FlowProject.submit([bundle_size, jobs, ...])

Submit function for the project's main submit interface.

class flow.FlowProject(config=None, environment=None, entrypoint=None)[source]

Bases: signac.contrib.project.Project

A signac project class specialized for workflow management.

This class is used to define, execute, and submit workflows based on operations and conditions.

Users typically interact with this class through its command line interface.

This is a typical example of how to use this class:

@FlowProject.operation
def hello(job):
    print('hello', job)

FlowProject().main()
Parameters
  • config (signac.contrib.project._ProjectConfig) – A signac configuration, defaults to the configuration loaded from the current directory.

  • environment (flow.environment.ComputeEnvironment) – An environment to use for scheduler submission. If None, the environment is automatically identified. The default is None.

  • entrypoint (dict) – A dictionary with two possible keys: 'executable' and 'path'. The path represents the location of the script file (the script file must call FlowProject.main()). The executable represents the location of the Python interpreter used for the execution of BaseFlowOperation that are Python functions.

ALIASES = {'active': 'A', 'error': 'E', 'group_active': 'GA', 'group_error': 'GE', 'group_held': 'GH', 'group_inactive': 'GI', 'group_queued': 'GQ', 'group_registered': 'GR', 'group_submitted': 'GS', 'held': 'H', 'inactive': 'I', 'queued': 'Q', 'registered': 'R', 'submitted': 'S', 'unknown': 'U'}

Default aliases used within the status output.

PRINT_STATUS_ALL_VARYING_PARAMETERS = True

This constant can be used to signal that the print_status() method is supposed to automatically show all varying parameters.

completed_operations(job)[source]

Determine which operations have been completed for job.

Parameters

job (Job) – The signac job handle.

Yields

str – The names of the operations that are complete.

detect_operation_graph()[source]

Determine the directed acyclic graph given by operation conditions.

In general, executing a given operation registered with a FlowProject just involves checking the operation’s preconditions and postconditions to determine eligibility. More generally, however, the preconditions and postconditions define a directed acyclic graph that governs the execution of all operations. Visualizing this graph can be useful for finding logic errors in the specified conditions, and having this graph computed also enables additional execution modes. For example, using this graph it is possible to determine exactly what operations need to be executed in order to make the operation eligible so that the task of executing all necessary operations can be automated.

The graph is determined by iterating over all pairs of operations and checking for equality of preconditions and postconditions. The algorithm builds an adjacency matrix based on whether the preconditions for one operation match the postconditions for another. The comparison of operations is conservative; by default, conditions must be composed of identical code to be identified as equal (technically, they must have equivalent bytecode, i.e. cond1.__code__.co_code == cond2.__code__.co_code). Users can specify that conditions should be treated as equal by providing tags to the operations.

Given a FlowProject subclass defined in a module project.py, the output graph could be visualized using Matplotlib and NetworkX with the following code:

import numpy as np
import networkx as nx
from matplotlib import pyplot as plt

from project import Project

project = Project()
ops = project.operations.keys()
adj = np.asarray(project.detect_operation_graph())

plt.figure()
g = nx.DiGraph(adj)
pos = nx.spring_layout(g)
nx.draw(g, pos)
nx.draw_networkx_labels(
    g, pos,
    labels={key: name for (key, name) in
            zip(range(len(ops)), [o for o in ops])})

plt.show()
Returns

The adjacency matrix of operation dependencies. A zero indicates no dependency, and a 1 indicates dependency. This can be converted to a graph using NetworkX.

Return type

list of lists of int

Raises

RuntimeError – If a condition does not have a tag. This can occur when using functools.partial, and a manually specified condition tag has not been set.

get_job_status(job, ignore_errors=False, cached_status=None)[source]

Return status information about a job.

Parameters
  • job (Job) – The signac job.

  • ignore_errors (bool) – Whether to ignore exceptions raised during status check. (Default value = False)

  • cached_status (dict) – Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of JobStatus. (Default value = None)

Returns

A dictionary containing status for the requested job.

Return type

dict

property groups

Get the dictionary of groups that have been added to the workflow.

classmethod label(label_name_or_func=None)[source]

Designate a function as a label function for this class.

For example, we can define a label function like this:

@FlowProject.label
def foo_label(job):
    if job.document.get('foo', False):
        return 'foo-label-text'

The foo-label-text label will now show up in the status view for each job, where the foo key evaluates true.

If the label functions returns any type other than str, the label name will be the name of the function if and only if the return value evaluates to True, for example:

@FlowProject.label
def foo_label(job):
    return job.document.get('foo', False)

Finally, specify a label name by providing it as the first argument to the label() decorator.

Parameters

label_name_or_func (str or callable) – A label name or callable. (Default value = None)

Returns

A decorator for the label function.

Return type

callable

labels(job)[source]

Yield all labels for the given job.

See also: label().

Parameters

job (signac.contrib.job.Job) – Job handle.

Yields

str – Label value.

main(parser=None)[source]

Call this function to use the main command line interface.

In most cases one would want to call this function as part of the class definition:

# my_project.py
from flow import FlowProject

class MyProject(FlowProject):
    pass

if __name__ == '__main__':
    MyProject().main()

The project can then be executed on the command line:

$ python my_project.py --help
Parameters

parser (argparse.ArgumentParser) – The argument parser used to implement the command line interface. If None, a new parser is constructed. (Default value = None)

classmethod make_group(name, options='', group_aggregator=None)[source]

Make a FlowGroup named name and return a decorator to make groups.

A FlowGroup is used to group operations together for running and submitting _JobOperations.

Examples

The code below creates a group and adds an operation to that group.

example_group = FlowProject.make_group('example')

@example_group
@FlowProject.operation
def foo(job):
    return "hello world"
Parameters
  • name (str) – The name of the FlowGroup.

  • options (str) – A string to append to submissions. Can be any valid FlowOperation.run() option. (Default value = “”)

  • group_aggregator (aggregator) – An instance of aggregator to associate with the FlowGroup. If None, no aggregation takes place (Default value = None).

Returns

The created group.

Return type

FlowGroupEntry

property operations

Get the dictionary of operations that have been added to the workflow.

print_status(jobs=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, param_max_width=None, expand=False, all_ops=False, only_incomplete=False, dump_json=False, unroll=True, compact=False, pretty=False, file=None, err=None, ignore_errors=False, template=None, profile=False, eligible_jobs_max_lines=None, output_format='terminal')[source]

Print the status of the project.

Parameters
  • jobs (iterable of Job or aggregates) – If None, print status for all jobs/aggregates. If not None, only print status for the given jobs or aggregates (Default value = None).

  • overview (bool) – Display an overview of the project status. (Default value = True)

  • overview_max_lines (int) – Limit the number of overview lines. (Default value = None)

  • detailed (bool) – Print a detailed status of each job. (Default value = False)

  • parameters (list of str) – Print the value of the specified parameters. (Default value = None)

  • param_max_width (int) – Limit the number of characters of parameter columns. (Default value = None)

  • expand (bool) – Present labels and operations in two separate tables. (Default value = False)

  • all_ops (bool) – Include operations that are not eligible to run. (Default value = False)

  • only_incomplete (bool) – Only show jobs that have eligible operations. (Default value = False)

  • dump_json (bool) – Output the data as JSON instead of printing the formatted output. (Default value = False)

  • unroll (bool) – Separate columns for jobs and the corresponding operations. (Default value = True)

  • compact (bool) – Print a compact version of the output. (Default value = False)

  • pretty (bool) – Prettify the output. (Default value = False)

  • file (str) – Redirect all output to this file, defaults to sys.stdout.

  • err (str) – Redirect all error output to this file, defaults to sys.stderr.

  • ignore_errors (bool) – Print status even if querying the scheduler fails. (Default value = False)

  • template (str) – User provided Jinja2 template file. (Default value = None)

  • profile (bool) – Show profile result. (Default value = False)

  • eligible_jobs_max_lines (int) – Limit the number of operations and its eligible job count printed in the overview. (Default value = None)

  • output_format (str) – Status output format, supports: ‘terminal’ (default), ‘markdown’ or ‘html’.

property project_hooks

hooks.Hooks defined for all project operations.

Project-wide hooks are added to an instance of the FlowProject, not the class. For example:

def finish_hook(operation_name, job):
    print(f"Finished operation {operation_name} on job {job.id}")

if __name__ == "__main__":
    project = FlowProject()
    project.project_hooks.on_finish.append(finish_hook)
    project.main()
run(jobs=None, names=None, pretend=False, np=None, timeout=None, num=None, num_passes=1, progress=False, order=None, ignore_conditions=IgnoreConditions.NONE)[source]

Execute all eligible operations for the given selection.

This function will run in an infinite loop until all eligible operations are executed, unless it reaches the maximum number of passes per operation or the maximum number of executions.

By default there is no limit on the total number of executions, but a specific operation will only be executed once per job. This is to avoid accidental infinite loops when no or faulty postconditions are provided.

Parameters
  • jobs (iterable of Job or aggregates) – If None, execute operations for all eligible jobs/aggregates. If not None, only execute operations for the given jobs or aggregates (Default value = None).

  • names (iterable of str) – Only execute operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)

  • pretend (bool) – Do not actually execute the operations, but show the commands that would have been executed. (Default value = False)

  • np (int) – Parallelize to the specified number of processors. Use -1 to parallelize over all available processors. The value None uses one processor (Default value = None).

  • timeout (float) – An optional timeout for each operation in seconds after which execution will be cancelled. Use None to indicate no timeout (Default value = None).

  • num (int) – The total number of operations that are executed will not exceed this argument if provided. (Default value = None)

  • num_passes (int or None) – The total number of executions of one specific job-operation pair will not exceed this argument. The default is 1, there is no limit if this argument is None.

  • progress (bool) – Show a progress bar during execution. (Default value = False)

  • order (str, callable, or None) –

    Specify the order of operations. Possible values are:

    • ’none’ or None (no specific order)

    • ’by-job’ (operations are grouped by job)

    • ’cyclic’ (order operations cyclic by job)

    • ’random’ (shuffle the execution order randomly)

    • callable (a callable returning a comparison key for an operation used to sort operations)

    The default value is 'none', which is equivalent to 'by-job' in the current implementation.

    Note

    Users are advised to not rely on a specific execution order as a substitute for defining the workflow in terms of preconditions and postconditions. However, a specific execution order may be more performant in cases where operations need to access and potentially lock shared resources.

  • ignore_conditions (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is IgnoreConditions.NONE.

scheduler_jobs(scheduler)[source]

Fetch jobs from the scheduler.

This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically.

However, this function will not automatically filter scheduler jobs which are not associated with this project.

Parameters

scheduler (Scheduler) – The scheduler instance.

Yields

ClusterJob – All cluster jobs fetched from the scheduler.

submit(bundle_size=1, jobs=None, names=None, num=None, parallel=False, force=False, ignore_conditions=IgnoreConditions.NONE, ignore_conditions_on_execution=IgnoreConditions.NONE, **kwargs)[source]

Submit function for the project’s main submit interface.

Parameters
  • bundle_size (int) – Specify the number of operations to be bundled into one submission, defaults to 1.

  • jobs (iterable of Job or aggregates) – If None, submit operations for all eligible jobs/aggregates. If not None, only submit operations for the given jobs or aggregates (Default value = None).

  • names (iterable of str) – Only submit operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)

  • num (int) – Limit the total number of submitted operations, defaults to no limit.

  • parallel (bool) – Execute all bundled operations in parallel. (Default value = False)

  • force (bool) – Ignore all warnings or checks during submission, just submit. (Default value = False)

  • ignore_conditions (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default is IgnoreConditions.NONE.

  • ignore_conditions_on_execution (IgnoreConditions) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility after submitting. The default is IgnoreConditions.NONE.

  • **kwargs – Additional keyword arguments forwarded to submit().

FlowProject.operation(func, name=None)

Add operation functions to the class workflow definition.

This object is designed to be used as a decorator, for example:

@FlowProject.operation
def hello(job):
    print('Hello', job)

Directives can also be specified by using FlowProject.operation.with_directives().

@FlowProject.operation.with_directives({"nranks": 4})
def mpi_hello(job):
    print("hello")
Parameters
  • func (callable) – The function to add to the workflow.

  • name (str) – The operation name. Uses the name of the function if None. (Default value = None)

Returns

The operation function.

Return type

callable

operation.with_directives(directives, name=None)

Decorate a function to make it an operation with additional execution directives.

Directives can be used to provide information about required resources such as the number of processors required for execution of parallelized operations. For more information, see Submission Directives. To apply directives to an operation that is part of a group, use FlowGroupEntry.with_directives().

Parameters
  • directives (dict) – Directives to use for resource requests and execution.

  • name (str) – The operation name. Uses the name of the function if None (Default value = None).

Returns

A decorator which registers the function with the provided name and directives as an operation of the FlowProject subclass.

Return type

function

Supported Directives:

executable

Return the path to the executable to be used for an operation.

The executable directive expects a string pointing to a valid executable file in the current file system.

When called, by default this should point to a Python executable (interpreter); however, if the FlowProject path is an empty string, the executable can be a path to an executable Python script. Defaults to sys.executable.

fork

The fork directive can be set to True to enforce that a particular operation is always executed within a subprocess and not within the Python interpreter’s process even if there are no other reasons that would prevent that.

Note

Setting fork=False will not prevent forking if there are other reasons for forking, such as a timeout.

memory

The memory to request for this operation.

The memory to validate should be either a float, int, or string. A valid memory argument is defined as:

  • Positive numeric value with suffix “g” or “G” indicating memory requested in gigabytes.

For example:

@Project.operation.with_directives({"memory": "4g"})
def op(job):
    pass
  • Positive numeric value with suffix “m” or “M” indicating memory requested in megabytes.

For example:

@Project.operation.with_directives({"memory": "512m"})
def op(job):
    pass
  • Positive numeric value with no suffix indicating memory requested in gigabytes.

For example:

@Project.operation.with_directives({"memory": "4"})
def op1(job):
    pass

@Project.operation.with_directives({"memory": 4})
def op2(job):
    pass

ngpu

The number of GPUs to use for this operation.

Expects a nonnegative integer. Defaults to 0.

np

The number of tasks to launch for a given operation i.e., the number of CPU cores to be requested for a given operation.

Expects a natural number (i.e. an integer >= 1). This directive introspects into the “nranks” or “omp_num_threads” directives and uses their product if it is greater than the current set value. Defaults to 1.

nranks

The number of MPI ranks to use for this operation. Defaults to 0.

Expects a nonnegative integer.

omp_num_threads

The number of OpenMP threads to use for this operation. Defaults to 0.

Expects a nonnegative integer.

processor_fraction

Fraction of a resource to use on a single operation.

If set to 0.5 for a bundled job with 20 operations (all with ‘np’ set to 1), 10 CPUs will be used. Defaults to 1.

Note

This can be particularly useful on Stampede2’s launcher.

walltime

The number of hours to request for executing this job.

This directive expects a float representing the walltime in hours. Fractional values are supported. For example, a value of 0.5 will request 30 minutes of walltime. If no walltimes are requested, the submission will not specify a walltime in the output script. Some schedulers have a default value that will be used.

For example:

@Project.operation.with_directives({"walltime": 24})
def op(job):
    # This operation takes 1 day to run
    pass
FlowProject.operation_hooks(hook_func, trigger)

Add hooks to an operation.

This object is designed to be used as a decorator. The example below shows an operation level decorator that prints the operation name and job id at the start of the operation execution.

def start_hook(operation_name, job):
    print(f"Starting operation {operation_name} on job {job.id}.")

@FlowProject.operation_hook.on_start(start_hook)
@FlowProject.operation
def foo(job):
    pass

A hook is a function that is called at specific points during the execution of a job operation. In the example above, the start_hook hook function is executed before the operation foo runs. Hooks can also run after an operation finishes, when an operation exits with error, or when an operation exits without error.

The available triggers are on_start, on_finish, on_fail, and on_success which run when the operation starts, completes, fails, and succeeds respectively.

Parameters
  • hook_func (callable) – The function that will be executed at a specified point.

  • trigger (string) – The point when a hook operation is executed.

classmethod operation_hooks.on_fail(hook_func)

Add a hook function triggered after the operation exits with an error.

classmethod operation_hooks.on_finish(hook_func)

Add a hook function triggered after the operation exits.

The hook is triggered regardless of whether the operation exits with or without an error.

classmethod operation_hooks.on_start(hook_func)

Add a hook function triggered before an operation starts.

classmethod operation_hooks.on_success(hook_func)

Add a hook function triggered after the operation exits without error.

FlowProject.post(tag=None)

Define and evaluate postconditions for operations.

A postcondition is a function accepting one or more jobs as positional arguments (*jobs) that must evaluate to True for this operation to be considered complete. For example:

@Project.operation
@Project.post(lambda job: job.doc.get('bye'))
def bye(job):
    print('bye', job)
    job.doc.bye = True

@Project.operation
@aggregator()
@Project.post(lambda *jobs: all("bye_all" in job.doc for job in jobs))
def bye_all(*jobs):
    print('bye', jobs)
    for job in jobs:
        job.doc.bye_all = True

The bye operation would be considered complete and therefore no longer eligible for execution once the ‘bye’ key in the job document evaluates to True. Similarly, the bye_all operation would be considered complete and therefore no longer eligible for execution only if the ‘bye_all’ key is present in all of the jobs passed.

An optional tag may be associated with the condition. These tags are used by detect_operation_graph() when comparing conditions for equality. The tag defaults to the bytecode of the function.

classmethod post.copy_from(*other_funcs)

Copy postconditions from other operation(s).

True if and only if all postconditions of other operation function(s) are met.

classmethod post.false(key)

Evaluate if a document key is False for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to False.

classmethod post.isfile(filename)

Determine if the specified file exists for the job(s).

classmethod post.never(func)

Return False.

classmethod post.not_(condition)

Return not condition(*jobs) for the provided condition function.

classmethod post.true(key)

Evaluate if a document key is True for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to True.

FlowProject.pre(tag=None)

Define and evaluate preconditions for operations.

A precondition is a function accepting one or more jobs as positional arguments (*jobs) that must evaluate to True for this operation to be eligible for execution. For example:

@Project.operation
@Project.pre(lambda job: not job.doc.get('hello'))
def hello(job):
    print('hello', job)
    job.doc.hello = True

@Project.operation
@aggregator()
@Project.pre(lambda *jobs: all("hi_all" not in job.doc for job in jobs))
def hi_all(*jobs):
    print('hi', jobs)
    for job in jobs:
        job.doc.hi_all = True

The hello operation would only execute if the ‘hello’ key in the job document does not evaluate to True. Similarly, the hi_all operation would execute only if the ‘hi_all’ key is not present in all of the jobs passed.

An optional tag may be associated with the condition. These tags are used by detect_operation_graph() when comparing conditions for equality. The tag defaults to the bytecode of the function.

classmethod pre.after(*other_funcs)

Precondition to run an operation after other operations.

True if and only if all postconditions of other operation function(s) are met.

classmethod pre.copy_from(*other_funcs)

Copy preconditions from other operation(s).

True if and only if all preconditions of other operation function(s) are met.

classmethod pre.false(key)

Evaluate if a document key is False for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to False.

classmethod pre.isfile(filename)

Determine if the specified file exists for the job(s).

classmethod pre.never(func)

Return False.

classmethod pre.not_(condition)

Return not condition(*jobs) for the provided condition function.

classmethod pre.true(key)

Evaluate if a document key is True for the job(s).

Returns True if the specified key is present in the job document(s) and evaluates to True.

class flow.IgnoreConditions(value)[source]

Bases: enum.IntFlag

Flags that determine which conditions are used to determine job eligibility.

ALL = 3

Ignore all conditions.

NONE = 0

Check all conditions.

POST = 2

Ignore postconditions.

PRE = 1

Ignore preconditions.

Operations and Status

class flow.project.BaseFlowOperation(pre=None, post=None)[source]

Bases: object

A BaseFlowOperation represents a data space operation acting on any job.

Every BaseFlowOperation is associated with a specific command.

Preconditions (pre) and postconditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False.

An operation is considered “eligible” for execution when all preconditions are met and when at least one of the postconditions is not met. Preconditions are always met when the list of preconditions is empty. Postconditions are never met when the list of postconditions is empty.

Note

This class should not be instantiated directly.

Parameters
  • pre (sequence of callables) – List of preconditions.

  • post (sequence of callables) – List of postconditions.

class flow.project.FlowOperation(op_func, pre=None, post=None)[source]

Bases: flow.project.BaseFlowOperation

An operation that executes a Python function.

All operations without the @cmd directive use this class. The callable op_func should be a function of one or more instances of Job.

Note

This class should not be instantiated directly.

Parameters
  • op_func (callable) – A callable function of *jobs.

  • pre (sequence of callables) – List of preconditions.

  • post (sequence of callables) – List of postconditions.

__call__(*jobs)[source]

Call the operation on the provided jobs.

Parameters

*jobs (One or more instances of Job.) – The jobs passed to the operation.

Returns

The result of the operation function.

Return type

object

class flow.project.FlowCmdOperation(cmd, pre=None, post=None)[source]

Bases: flow.project.BaseFlowOperation

An operation that executes a shell command.

When an operation has the @cmd directive specified, it is instantiated as a FlowCmdOperation. The operation should be a function of one or more positional arguments that are instances of Job. The command (cmd) may either be a callable that expects one or more instances of Job as positional arguments and returns a string containing valid shell commands, or the string of commands itself. In either case, the resulting string may contain any attributes of the job (or jobs) placed in curly braces, which will then be substituted by Python string formatting.

Note

This class should not be instantiated directly.

Parameters
  • cmd (str or callable) – The command to execute the operation. Callable values will be provided one or more positional arguments (*jobs) that are instances of Job. String values will be formatted with cmd.format(jobs=jobs) where jobs is a tuple of Job, or cmd.format(jobs=jobs, job=job) if only one job is provided.

  • pre (sequence of callables) – List of preconditions.

  • post (sequence of callables) – List of postconditions.

__call__(*jobs)[source]

Return the command formatted with the supplied job(s).

Labels

class flow.label(name=None)[source]

Bases: object

Decorate a FlowProject class function as a label function.

For example:

class MyProject(FlowProject):

    @label()
    def foo(self, job):
        return True
__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters

func (callable) – The function to decorate.

Returns

The decorated function.

Return type

callable

class flow.staticlabel(name=None)[source]

Bases: flow.labels.label

A label decorator for staticmethods.

This decorator implies “staticmethod”!

__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters

func (callable) – The function to decorate.

Returns

The decorated function.

Return type

callable

class flow.classlabel(name=None)[source]

Bases: flow.labels.label

A label decorator for classmethods.

This decorator implies “classmethod”!

__call__(func)[source]

Add the function as a label.

This call operator allows the class to be used as a decorator.

Parameters

func (callable) – The function to decorate.

Returns

The decorated function.

Return type

callable

@flow.cmd

flow.cmd(func)[source]

Indicate that func returns a shell command with this decorator.

If this function is an operation function defined by FlowProject, it will be interpreted to return a shell command, instead of executing the function itself.

For example:

@FlowProject.operation
@flow.cmd
def hello(job):
    return "echo {job.id}"

Note

The final shell command generated for run() or submit() still respects directives and will prepend e.g. MPI or OpenMP prefixes to the shell command provided here.

@flow.with_job

flow.with_job(func)[source]

Use arg as a context manager for func(arg) with this decorator.

This decorator can only be used for operations that accept a single job as a parameter.

If this function is an operation function defined by FlowProject, it will be the same as using with job:.

For example:

@FlowProject.operation
@flow.with_job
def hello(job):
    print("hello {}".format(job))

Is equivalent to:

@FlowProject.operation
def hello(job):
    with job:
        print("hello {}".format(job))

This also works with the @cmd decorator:

@FlowProject.operation
@with_job
@cmd
def hello(job):
    return "echo 'hello {}'".format(job)

Is equivalent to:

@FlowProject.operation
@cmd
def hello_cmd(job):
    return 'trap "cd `pwd`" EXIT && cd {} && echo "hello {job}"'.format(job.ws)

@flow.directives

class flow.directives(**kwargs)[source]

Bases: object

Decorator for operation functions to provide additional execution directives.

Directives can for example be used to provide information about required resources such as the number of processes required for execution of parallelized operations. For more information, read about Submission Directives.

Deprecated since version 0.15: This decorator is deprecated and will be removed in 1.0. Use FlowProject.operation.with_directives instead.

Supported Directives:

executable

Return the path to the executable to be used for an operation.

The executable directive expects a string pointing to a valid executable file in the current file system.

When called, by default this should point to a Python executable (interpreter); however, if the FlowProject path is an empty string, the executable can be a path to an executable Python script. Defaults to sys.executable.

fork

The fork directive can be set to True to enforce that a particular operation is always executed within a subprocess and not within the Python interpreter’s process even if there are no other reasons that would prevent that.

Note

Setting fork=False will not prevent forking if there are other reasons for forking, such as a timeout.

memory

The memory to request for this operation.

The memory to validate should be either a float, int, or string. A valid memory argument is defined as:

  • Positive numeric value with suffix “g” or “G” indicating memory requested in gigabytes.

For example:

@Project.operation.with_directives({"memory": "4g"})
def op(job):
    pass
  • Positive numeric value with suffix “m” or “M” indicating memory requested in megabytes.

For example:

@Project.operation.with_directives({"memory": "512m"})
def op(job):
    pass
  • Positive numeric value with no suffix indicating memory requested in gigabytes.

For example:

@Project.operation.with_directives({"memory": "4"})
def op1(job):
    pass

@Project.operation.with_directives({"memory": 4})
def op2(job):
    pass

ngpu

The number of GPUs to use for this operation.

Expects a nonnegative integer. Defaults to 0.

np

The number of tasks to launch for a given operation i.e., the number of CPU cores to be requested for a given operation.

Expects a natural number (i.e. an integer >= 1). This directive introspects into the “nranks” or “omp_num_threads” directives and uses their product if it is greater than the current set value. Defaults to 1.

nranks

The number of MPI ranks to use for this operation. Defaults to 0.

Expects a nonnegative integer.

omp_num_threads

The number of OpenMP threads to use for this operation. Defaults to 0.

Expects a nonnegative integer.

processor_fraction

Fraction of a resource to use on a single operation.

If set to 0.5 for a bundled job with 20 operations (all with ‘np’ set to 1), 10 CPUs will be used. Defaults to 1.

Note

This can be particularly useful on Stampede2’s launcher.

walltime

The number of hours to request for executing this job.

This directive expects a float representing the walltime in hours. Fractional values are supported. For example, a value of 0.5 will request 30 minutes of walltime. If no walltimes are requested, the submission will not specify a walltime in the output script. Some schedulers have a default value that will be used.

For example:

@Project.operation.with_directives({"walltime": 24})
def op(job):
    # This operation takes 1 day to run
    pass
__call__(func)[source]

Add directives to the function.

This call operator allows the class to be used as a decorator.

Parameters

func (callable) – The function to decorate.

Returns

The decorated function.

Return type

callable

classmethod copy_from(func)[source]

Copy directives from another operation.

flow.init()

flow.init(alias=None, template=None, root=None)[source]

Initialize a templated FlowProject module.

Parameters
  • alias (str) – Python identifier used as a file name for the template output. Uses "project" if None. (Default value = None)

  • template (str) –

    Name of the template to use. Built-in templates are:

    • "minimal"

    • "example"

    • "testing"

    Uses "minimal" if None. (Default value = None)

  • root (str) – Directory where the output file is placed. Uses the current working directory if None. (Default value = None)

Returns

List of file names created.

Return type

list

flow.get_environment()

flow.get_environment(test=False, import_configured=True)[source]

Attempt to detect the present environment.

This function iterates through all defined ComputeEnvironment classes in reversed order of definition and returns the first environment where the is_present() method returns True.

Parameters
  • test (bool) – Whether to return the TestEnvironment. (Default value = False)

  • import_configured (bool) – Whether to import environments specified in the flow configuration. (Default value = True)

Returns

The detected environment class.

Return type

ComputeEnvironment

The FlowGroup

class flow.project.FlowGroup(name, operations=None, operation_directives=None, options='')[source]

Bases: object

A FlowGroup represents a subset of a workflow for a project.

A FlowGroup is associated with one or more instances of BaseFlowOperation.

Examples

In the example below, the directives will be {'nranks': 4} for op1 and {'nranks': 2, 'executable': 'python3'} for op2.

group = FlowProject.make_group(name='example_group')

@group.with_directives({"nranks": 4})
@FlowProject.operation.with_directives({"nranks": 2, "executable": "python3"})
def op1(job):
    pass

@group
@FlowProject.operation.with_directives({"nranks": 2, "executable": "python3"})
def op2(job):
    pass
Parameters
  • name (str) – The name of the group to be used when calling from the command line.

  • operations (dict) – A dictionary of operations where the keys are operation names and each value is a BaseFlowOperation.

  • operation_directives (dict) – A dictionary of additional parameters that provide instructions on how to execute a particular operation, e.g., specifically required resources. Operation names are keys and the dictionaries of directives are values. If an operation does not have directives specified, then the directives of the singleton group containing that operation are used. To prevent this, set the directives to an empty dictionary for that operation.

  • options (str) – A string of options to append to the output of the object’s call method. This allows options like --num_passes to be given to a group.

add_operation(name, operation, directives=None)[source]

Add an operation to the FlowGroup.

Parameters
  • name (str) – The name of the operation.

  • operation (BaseFlowOperation) – The workflow operation to add to the FlowGroup.

  • directives (dict) – The operation specific directives. (Default value = None)

isdisjoint(group)[source]

Return whether two groups are disjoint.

Groups are disjoint if they do not share any common operations.

Parameters

group (FlowGroup) – The other FlowGroup to compare to.

Returns

Returns True if group and self share no operations, otherwise returns False.

Return type

bool

class flow.project.FlowGroupEntry(name, project, options='', group_aggregator=None)[source]

Bases: object

A FlowGroupEntry registers operations for inclusion into a FlowGroup.

Application developers should not directly instantiate this class, but use make_group() instead.

Operation functions can be marked for inclusion into a FlowGroup by decorating the functions with a corresponding FlowGroupEntry. If the operation requires specific directives, with_directives() accepts keyword arguments that are mapped to directives and returns a decorator that can be applied to the operation to mark it for inclusion in the group and indicate that it should be executed using the specified directives. This overrides the default directives specified by flow.directives().

Parameters
  • name (str) – The name of the FlowGroup to be created.

  • project (flow.FlowProject) – The project the group is associated with.

  • options (str) – The FlowProject.run() options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute.

  • group_aggregator (aggregator) – aggregator object associated with the FlowGroup (Default value = None).

__call__(func)[source]

Add the function into the group’s operations.

This call operator allows the class to be used as a decorator.

Parameters

func (callable) – The function to decorate.

Returns

The decorated function.

Return type

callable

with_directives(directives)[source]

Decorate an operation to provide additional execution directives for this group.

Directives can be used to provide information about required resources such as the number of processors required for execution of parallelized operations. For a list of supported directives, see FlowProject.operation.with_directives(). For more information, see Submission Directives.

The directives specified in this decorator are only applied when executing the operation through the FlowGroup. To apply directives to an individual operation executed outside of the group, see FlowProject.operation.with_directives().

Parameters

directives (dict) – Directives to use for resource requests and execution.

Returns

A decorator which registers the operation with the group using the specified directives.

Return type

function

Aggregation

class flow.aggregator(aggregator_function=None, sort_by=None, sort_ascending=True, select=None)[source]

Bases: object

Decorator for operation functions that operate on aggregates.

By default, if the aggregator_function is None, an aggregate of all jobs will be created.

Examples

The code block below defines a FlowOperation that prints the total length of the provided aggregate of jobs.

@aggregator()
@FlowProject.operation
def foo(*jobs):
    print(len(jobs))
Parameters
  • aggregator_function (callable or None) – A callable that performs aggregation of jobs. It takes in a list of jobs and can return or yield subsets of jobs as an iterable. The default behavior is creating a single aggregate of all jobs.

  • sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the key argument to sorted(). If None, no sorting is performed (Default value = None).

  • sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).

  • select (callable or None) – Condition for filtering individual jobs. This is passed as the function argument to filter(). If None, no filtering is performed (Default value = None).

__call__(func=None)[source]

Add this aggregator to a provided operation.

This call operator allows the class to be used as a decorator.

Parameters

func (callable) – The function to decorate.

classmethod groupby(key, default=None, sort_by=None, sort_ascending=True, select=None)[source]

Aggregate jobs according to matching state point values.

Examples

The code block below provides an example of how to aggregate jobs by a state point parameter "key". If the state point does not contain the key "key", a default value of -1 is used.

@aggregator.groupby(key="key", default=-1)
@FlowProject.operation
def foo(*jobs):
    print(len(jobs))
Parameters
  • key (str, Iterable[str], or callable) – The method by which jobs are grouped. It may be a state point key or an iterable of state point keys whose values define the groupings. It may also be an arbitrary callable of Job when greater flexibility is needed.

  • default (Any) – Default value used for grouping if the key is missing or invalid. If key is an iterable, the default value must be a sequence of equal length. If key is a callable, this argument is ignored. If None, the provided keys must exist for all jobs (Default value = None).

  • sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the key argument to sorted(). If None, no sorting is performed (Default value = None).

  • sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).

  • select (callable or None) – Condition for filtering individual jobs. This is passed as the function argument to filter(). If None, no filtering is performed (Default value = None).

Returns

aggregator – The groupby() aggregator.

Return type

aggregator

classmethod groupsof(num=1, sort_by=None, sort_ascending=True, select=None)[source]

Aggregate jobs into groupings of a given size.

By default, creates aggregates consisting of a single job.

If the number of jobs present in the project is not divisible by the number provided by the user, the last aggregate will be smaller and contain the remaining jobs. For instance, if 10 jobs are present in a project and they are aggregated in groups of 3, then the generated aggregates will have lengths 3, 3, 3, and 1.

Examples

The code block below shows how to aggregate jobs in groups of 2.

@aggregator.groupsof(num=2)
@FlowProject.operation
def foo(*jobs):
    print(len(jobs))
Parameters
  • num (int) – The default size of aggregates. The final aggregate contains the remaining jobs and may have fewer than num jobs.

  • sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the key argument to sorted(). If None, no sorting is performed (Default value = None).

  • sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).

  • select (callable or None) – Condition for filtering individual jobs. This is passed as the function argument to filter(). If None, no filtering is performed (Default value = None).

Returns

aggregator – The groupsof() aggregator.

Return type

aggregator

flow.get_aggregate_id(aggregate)[source]

Generate aggregate id for an aggregate of jobs.

The aggregate id is a unique hash identifying a tuple of jobs. The aggregate id is sensitive to the order of the jobs in the aggregate. The id of an aggregate containing one job is that job’s id (the hash of its state point).

Parameters

aggregate (tuple of Job) – Aggregate of signac jobs.

Returns

The generated aggregate id.

Return type

str

Compute Environments

Detection of compute environments.

This module provides the ComputeEnvironment class, which can be subclassed to automatically detect specific computational environments.

This enables the user to adjust their workflow based on the present environment, e.g. for the adjustment of scheduler submission scripts.

flow.environment.setup(py_modules, **attrs)[source]

Set up user-defined environment modules.

Use this function in place of setuptools.setup() to not only install an environment’s module, but also register it with the global signac configuration. Once registered, the environment is automatically imported when the get_environment() function is called.

flow.environment.template_filter(func)[source]

Decorate a function as a ComputeEnvironment template filter.

This decorator is applied to methods defined in a subclass of ComputeEnvironment that are used in that environment’s templates. The decorated function becomes a class method that is available as a jinja2 filter in templates rendered by a FlowProject with that ComputeEnvironment.

Parameters

func (callable) – Function to decorate.

Returns

Decorated function.

Return type

callable

class flow.environment.ComputeEnvironment[source]

Bases: object

Define computational environments.

The ComputeEnvironment class allows us to automatically determine specific environments in order to programmatically adjust workflows in different environments.

The default method for the detection of a specific environment is to provide a regular expression matching the environment’s hostname. For example, if the hostname is my-server.com, one could identify the environment by setting the hostname_pattern to 'my-server'.

classmethod add_args(parser)[source]

Add arguments related to this compute environment to an argument parser.

Parameters

parser (argparse.ArgumentParser) – The argument parser where arguments will be added.

classmethod get_config_value(key, default=<flow.util.config._GetConfigValueNoneType object>)[source]

Request a value from the user’s configuration.

This method should be used whenever values need to be provided that are specific to a user’s environment, e.g. account names.

When a key is not configured and no default value is provided, a SubmitError will be raised and the user will be prompted to add the missing key to their configuration.

Please note, that the key will be automatically expanded to be specific to this environment definition. For example, a key should be 'account', not 'MyEnvironment.account'.

Parameters
  • key (str) – The environment specific configuration key.

  • default (str) – A default value in case the key cannot be found within the user’s configuration.

Returns

The value or default value.

Return type

object

Raises

SubmitError – If the key is not in the user’s configuration and no default value is provided.

classmethod get_prefix(operation, parallel=False, mpi_prefix=None, cmd_prefix=None)[source]

Template filter generating a command prefix from directives.

Parameters
  • operation (flow.project._JobOperation) – The operation to be prefixed.

  • parallel (bool) – If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the maximum number of tasks. Default is set to False.

  • mpi_prefix (str) – User defined mpi_prefix string. Default is set to None. This will be deprecated and removed in the future.

  • cmd_prefix (str) – User defined cmd_prefix string. Default is set to None. This will be deprecated and removed in the future.

Returns

The prefix to be added to the operation’s command.

Return type

str

classmethod get_scheduler()[source]

Return an environment-specific scheduler driver.

The returned scheduler class provides a standardized interface to different scheduler implementations.

classmethod is_present()[source]

Determine whether this specific compute environment is present.

The default method for environment detection is trying to match a hostname pattern or delegate the environment check to the associated scheduler type.

classmethod submit(script, flags=None, *args, **kwargs)[source]

Submit a job submission script to the environment’s scheduler.

Scripts should be submitted to the environment, instead of directly to the scheduler to allow for environment specific post-processing.

Parameters
  • script (str) – The script to submit.

  • flags (list) – A list of additional flags to provide to the scheduler. (Default value = None)

  • *args – Positional arguments forwarded to the scheduler’s submit method.

  • **kwargs – Keyword arguments forwarded to the scheduler’s submit method.

Returns

Status of job, if submitted.

Return type

JobStatus.submitted or None

class flow.environment.StandardEnvironment[source]

Bases: flow.environment.ComputeEnvironment

Default environment which is always present.

classmethod is_present()[source]

Determine whether this specific compute environment is present.

The StandardEnvironment is always present, so this returns True.

class flow.environment.TestEnvironment[source]

Bases: flow.environment.ComputeEnvironment

Environment used for testing.

The test environment will print a mocked submission script and submission commands to screen. This enables testing of the job submission script generation in environments without a real scheduler.

scheduler_type

alias of flow.scheduling.fake_scheduler.FakeScheduler

class flow.environment.SimpleSchedulerEnvironment[source]

Bases: flow.environment.ComputeEnvironment

An environment for the simple-scheduler scheduler.

scheduler_type

alias of flow.scheduling.simple_scheduler.SimpleScheduler

flow.environment.registered_environments(import_configured=True)[source]

Return a list of registered environments.

Parameters

import_configured (bool) – Whether to import environments specified in the flow configuration. (Default value = True)

Returns

List of registered environments.

Return type

list

Schedulers

Defines the API for the scheduling system.

class flow.scheduling.base.JobStatus(value)[source]

Bases: enum.IntEnum

Classifies the job’s execution status.

Group statuses exist to enable status output for individual operations within a larger group.

active = 7

The cluster job is actively running.

error = 8

The cluster job is in an error or failed state.

group_active = 14

The operation is in a group that is actively running.

group_error = 15

The operation is in a group that is in an error or failed state.

group_held = 12

The operation is in a group that is held.

group_inactive = 10

The operation is in a group that is inactive.

This includes states like completed, cancelled, or timed out.

group_queued = 13

The operation is in a group that is queued.

group_registered = 9

The operation is in a group that is registered with the scheduler.

group_submitted = 11

The operation is in a group that has been submitted.

Note that this state is never returned by a scheduler, but is an assumed state immediately after a group containing the operation is submitted.

held = 5

The cluster job is held.

inactive = 3

The cluster job is inactive.

This includes states like completed, cancelled, or timed out.

placeholder = 127

A placeholder state that is used for status rendering when no operations are eligible.

queued = 6

The cluster job is queued.

registered = 2

The cluster job is registered with the scheduler, but no other status is known.

submitted = 4

The cluster job has been submitted.

Note that this state is never returned by a scheduler, but is an assumed state immediately after a cluster job is submitted.

unknown = 1

Unknown cluster job status.

user = 128

All user-defined states must be >=128 in value.

class flow.scheduling.base.ClusterJob(job_id, status=None)[source]

Bases: object

Class representing a cluster job.

name()[source]

Return the name of the cluster job.

status()[source]

Return the status of the cluster job.

class flow.scheduling.base.Scheduler[source]

Bases: abc.ABC

Abstract base class for schedulers.

abstract classmethod is_present()[source]

Return True if the scheduler is detected.

abstract jobs()[source]

Yield all cluster jobs.

Yields

ClusterJob – Cluster job.

abstract submit(script, **kwargs)[source]

Submit a job script to the scheduler for execution.

class flow.scheduling.fake_scheduler.FakeScheduler[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for a fake scheduler.

This scheduler does not actually schedule (or execute) any jobs, but it can be used to test the submission workflow.

classmethod is_present()[source]

Return False.

The FakeScheduler is never present unless manually specified.

jobs()[source]

Return None (no jobs are scheduled by the FakeScheduler).

submit(script, **kwargs)[source]

Print the script to screen.

Parameters
  • script (str) – Script to print.

  • **kwargs – Keyword arguments (ignored).

class flow.scheduling.lsf.LSFScheduler(user=None)[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for LSF schedulers.

This class can submit cluster jobs to a LSF scheduler and query their current status.

Parameters
  • user (str) – Limit the status information to cluster jobs submitted by user.

  • **kwargs – Forwarded to the parent constructor.

classmethod is_present()[source]

Return True if an LSF scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters
  • script (str) – The job script submitted for execution.

  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)

  • hold (bool) – Whether to hold the job upon submission. (Default value = False)

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)

  • **kwargs – Additional keyword arguments (ignored).

Returns

True if the submission command succeeds (or in pretend mode).

Return type

bool

Raises

SubmitError – If the submission command fails.

class flow.scheduling.pbs.PBSScheduler(user=None)[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for PBS schedulers.

This class can submit cluster jobs to a PBS scheduler and query their current status.

Parameters
  • user (str) – Limit the status information to cluster jobs submitted by user.

  • **kwargs – Forwarded to the parent constructor.

classmethod is_present()[source]

Return True if a PBS scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters
  • script (str) – The job script submitted for execution.

  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)

  • hold (bool) – Whether to hold the job upon submission. (Default value = False)

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)

  • **kwargs – Additional keyword arguments (ignored).

Returns

True if the submission command succeeds (or in pretend mode).

Return type

bool

Raises

SubmitError – If the submission command fails.

class flow.scheduling.simple_scheduler.SimpleScheduler[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for the bundled simple-scheduler.

The package signac-flow includes a script in bin/simple-scheduler that is a simple model of a cluster job scheduler. The simple-scheduler script is designed primarily for testing and demonstration.

This class can submit cluster jobs to the built-in simple scheduler and query their current status.

classmethod is_present()[source]

Return True if a SimpleScheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, pretend=False, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters
  • script (str) – The job script submitted for execution.

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • **kwargs – Keyword arguments (ignored).

Returns

True if the submission command succeeds (or in pretend mode).

Return type

bool

Raises

SubmitError – If the submission command fails.

class flow.scheduling.slurm.SlurmScheduler(user=None)[source]

Bases: flow.scheduling.base.Scheduler

Implementation of the abstract Scheduler class for SLURM schedulers.

This class can submit cluster jobs to a SLURM scheduler and query their current status.

Parameters
  • user (str) – Limit the status information to cluster jobs submitted by user.

  • **kwargs – Forwarded to the parent constructor.

classmethod is_present()[source]

Return True if a SLURM scheduler is detected.

jobs()[source]

Yield cluster jobs by querying the scheduler.

submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]

Submit a job script for execution to the scheduler.

Parameters
  • script (str) – The job script submitted for execution.

  • after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)

  • hold (bool) – Whether to hold the job upon submission. (Default value = False)

  • pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)

  • flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)

  • **kwargs – Additional keyword arguments (ignored).

Returns

True if the submission command succeeds (or in pretend mode).

Return type

bool

Raises

SubmitError – If the submission command fails.

Error Classes

Definitions of Exception classes used in this package.

exception flow.errors.ConfigKeyError[source]

Bases: KeyError

Indicates that a config key was not found.

exception flow.errors.DirectivesError[source]

Bases: ValueError

Indicates that a directive was incorrectly set.

exception flow.errors.FlowProjectDefinitionError[source]

Bases: ValueError

Indicates an invalid FlowProject definition.

exception flow.errors.NoSchedulerError[source]

Bases: AttributeError

Indicates that there is no scheduler type defined for an environment class.

exception flow.errors.SubmitError[source]

Bases: RuntimeError

Indicates an error during cluster job submission.

class flow.errors.TemplateError(environment: jinja2.environment.Environment)[source]

Bases: jinja2.ext.Extension

Indicates an error in a jinja2 template.

err(msg, caller)[source]

Raise a template error.

parse(parser)[source]

Call err() when a template raises an Exception.

exception flow.errors.UserConditionError[source]

Bases: RuntimeError

Indicates an error during evaluation of a condition.

exception flow.errors.UserOperationError[source]

Bases: RuntimeError

Indicates an error during execution of a BaseFlowOperation.