API

Workflow management based on the signac framework.

The signac-flow package provides the basic infrastructure to easily configure and implement a workflow to operate on a signac data space.

class flow.FlowProject(config=None, environment=None)

Bases: signac.contrib.project.Project

A signac project class specialized for workflow management.

This class provides a command line interface for the definition, execution, and submission of workflows based on condition and operation functions.

This is a typical example on how to use this class:

@FlowProject.operation
def hello(job):
    print('hello', job)

FlowProject().main()
Parameters:config (A signac config object.) – A signac configuaration, defaults to the configuration loaded from the environment.
add_operation(name, cmd, pre=None, post=None, **kwargs)

Add an operation to the workflow.

This method will add an instance of FlowOperation to the operations-dict of this project.

See also

A Python function may be defined as an operation function directly using the operation() decorator.

Any FlowOperation is associated with a specific command, which should be a function of Job. The command (cmd) can be stated as function, either by using str-substitution based on a job’s attributes, or by providing a unary callable, which expects an instance of job as its first and only positional argument.

For example, if we wanted to define a command for a program called ‘hello’, which expects a job id as its first argument, we could contruct the following two equivalent operations:

op = FlowOperation('hello', cmd='hello {job._id}')
op = FlowOperation('hello', cmd=lambda 'hello {}'.format(job._id))

Here are some more useful examples for str-substitutions:

# Substitute job state point parameters:
op = FlowOperation('hello', cmd='cd {job.ws}; hello {job.sp.a}')

Pre-requirements (pre) and post-conditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False.

An operation is considered “eligible” for execution when all pre-requirements are met and when at least one of the post-conditions is not met. Requirements are always met when the list of requirements is empty and post-conditions are never met when the list of post-conditions is empty.

Please note, eligibility in this contexts refers only to the workflow pipline and not to other contributing factors, such as whether the job-operation is currently running or queued.

Parameters:
  • name (str) – A unique identifier for this operation, may be freely choosen.
  • cmd (str or callable) – The command to execute operation; should be a function of job.
  • pre (sequence of callables) – required conditions
  • post – post-conditions to determine completion
classify(job)

Generator function which yields labels for job.

By default, this method yields from the project’s labels() method.

Parameters:job (Job) – The signac job handle.
Yields:The labels to classify job.
Yield type:str
completed_operations(job)

Determine which operations have been completed for job.

Parameters:job (Job) – The signac job handle.
Returns:The name of the operations that are complete.
Return type:str
eligible(job_operation, **kwargs)

Determine if job is eligible for operation.

Deprecated since version 0.5: Please use eligible_for_submission() instead.

eligible_for_submission(job_operation)

Determine if a job-operation is eligible for submission.

By default, an operation is eligible for submission when it is not considered active, that means already queued or running.

export_job_stati(collection, stati)

Export the job stati to a database collection.

get_job_status(job, ignore_errors=False)

Return a dict with detailed information about the status of a job.

classmethod label(label_name_or_func=None)

Designate a function to be a label function of this class.

For example, we can define a label function like this:

@FlowProject.label
def foo_label(job):
    if job.document.get('foo', False):
        return 'foo-label-text'

The foo-label-text label will now show up in the status view for each job, where the foo key evaluates true.

If instead of a str, the label functions returns any other type, the label name will be the name of the function if and only if the return value evaluates to True, for example:

@FlowProject.label
def foo_label(job):
    return job.document.get('foo', False)

Finally, you can specify a different default label name by providing it as the first argument to the label() decorator.

New in version 0.6.

labels(job)

Yields all labels for the given job.

See also: label()

main(parser=None, pool=None)

Call this function to use the main command line interface.

In most cases one would want to call this function as part of the class definition, e.g.:

 my_project.py
from flow import FlowProject

class MyProject(FlowProject):
    pass

if __name__ == '__main__':
    MyProject().main()

You can then execute this script on the command line:

$ python my_project.py --help
map_scheduler_jobs(scheduler_jobs)

Map all scheduler jobs by job id and operation name.

This function fetches all scheduled jobs from the scheduler and generates a nested dictionary, where the first key is the job id, the second key the operation name and the last value are the cooresponding scheduler jobs.

For example, to print the status of all scheduler jobs, associated with a specific job operation, execute:

sjobs = project.scheduler_jobs(scheduler)
sjobs_map = project.map_scheduler_jobs(sjobs)
for sjob in sjobs_map[job.get_id()][operation]:
    print(sjob._id(), sjob.status())
Parameters:scheduler_jobs – An iterable of scheduler job instances.
Returns:A nested dictionary (job_id, op_name, scheduler jobs)
next_operation(job)

Determine the next operation for this job.

Parameters:job (Job) – The signac job handle.
Returns:An instance of JobOperation to execute next or None, if no operation is eligible.
Return type::py:class:`~.JobOperation or NoneType
next_operations(job)

Determine the next eligible operations for job.

Parameters:job (Job) – The signac job handle.
Yield:All instances of JobOperation job is eligible for.
classmethod operation(func, name=None)

Add the function func as operation function to the class workflow definition.

This function is designed to be used as a decorator function, for example:

@FlowProject.operation
def hello(job):
    print('Hello', job)

See also: add_operation().

New in version 0.6.

operations

The dictionary of operations that have been added to the workflow.

print_status(jobs=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, skip_active=False, param_max_width=None, expand=False, all_ops=False, only_incomplete=False, dump_json=False, unroll=True, compact=False, pretty=False, file=None, err=None, ignore_errors=False, scheduler=None, pool=None, job_filter=None, no_parallelize=False)

Print the status of the project.

Changed in version 0.6.

Parameters:
  • jobs (Sequence of instances Job.) – Only execute operations for the given jobs, or all if the argument is omitted.
  • overview (bool) – Aggregate an overview of the project’ status.
  • overview_max_lines (int) – Limit the number of overview lines.
  • detailed (bool) – Print a detailed status of each job.
  • parameters (list of str) – Print the value of the specified parameters.
  • skip_active (bool) – Only print jobs that are currently inactive.
  • param_max_width – Limit the number of characters of parameter columns, see also: update_aliases().
  • file – Redirect all output to this file, defaults to sys.stdout.
  • err – Redirect all error output to this file, defaults to sys.stderr.
  • scheduler (Scheduler) – (deprecated) The scheduler instance used to fetch the job statuses.
  • pool – (deprecated) A multiprocessing or threading pool. Providing a pool parallelizes this method.
  • job_filter – (deprecated) A JSON encoded filter, that all jobs to be submitted need to match.
run(jobs=None, names=None, pretend=False, np=None, timeout=None, num=None, num_passes=1, progress=False)

Execute all pending operations for the given selection.

This function will run in an infinite loop until all pending operations have been executed or the total number of passes per operation or the total number of exeutions have been reached.

By default there is no limit on the total number of executions, but a specific operation will only be executed once per job. This is to avoid accidental infinite loops when no or faulty post conditions are provided.

See also: run_operations()

Changed in version 0.6.

Parameters:
  • jobs (Sequence of instances Job.) – Only execute operations for the given jobs, or all if the argument is omitted.
  • names (Sequence of str) – Only execute operations that are in the provided set of names, or all, if the argument is omitted.
  • pretend (bool) – Do not actually execute the operations, but show which command would have been used.
  • np (int) – Parallelize to the specified number of processors. Use -1 to parallelize to all available processing units.
  • timeout (int) – An optional timeout for each operation in seconds after which execution will be cancelled. Use -1 to indicate not timeout (the default).
  • num (int) – The total number of operations that are executed will not exceed this argument if provided.
  • num_passes (int) – The total number of one specific job-operation pair will not exceed this argument. The default is 1, there is no limit if this argumet is None.
  • progress – Show a progress bar during execution.
run_operations(operations=None, pretend=False, np=None, timeout=None, progress=False)

Execute the next operations as specified by the project’s workflow.

See also: run()

New in version 0.6.

Parameters:
  • operations (Sequence of instances of JobOperation) – The operations to execute (optional).
  • pretend (bool) – Do not actually execute the operations, but show which command would have been used.
  • np (int) – The number of processors to use for each operation.
  • timeout (int) – An optional timeout for each operation in seconds after which execution will be cancelled. Use -1 to indicate not timeout (the default).
  • progress – Show a progress bar during execution.
scheduler_jobs(scheduler)

Fetch jobs from the scheduler.

This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically.

However, this function will not automatically filter scheduler jobs which are not associated with this project.

Parameters:scheduler (Scheduler) – The scheduler instance.
Yields:All scheduler jobs fetched from the scheduler instance.
script(operations, parallel=False, template='script.sh', show_template_help=False)

Generate a run script to execute given operations.

Parameters:
  • operations (Sequence of instances of JobOperation) – The operations to execute.
  • parallel – Execute all operations in parallel (default is False).
  • parallel – bool
  • template (str) – The name of the template to use to generate the script.
  • show_template_help (bool) – Show help related to the templating system and then exit.
submit(bundle_size=1, jobs=None, names=None, num=None, parallel=False, force=False, walltime=None, env=None, **kwargs)

Submit function for the project’s main submit interface.

Changed in version 0.6.

Parameters:
  • bundle_size (int) – Specify the number of operations to be bundled into one submission, defaults to 1.
  • jobs (Sequence of instances Job.) – Only submit operations associated with the provided jobs. Defaults to all jobs.
  • names (Sequence of str) – Only submit operations with any of the given names, defaults to all names.
  • num (int) – Limit the total number of submitted operations, defaults to no limit.
  • parallel (bool) – Execute all bundled operations in parallel. Has no effect without bundling.
  • force (bool) – Ignore all warnings or checks during submission, just submit.
  • walltime – Specify the walltime in hours or as instance of datetime.timedelta.
submit_operations(operations, _id=None, env=None, parallel=False, flags=None, force=False, template='script.sh', pretend=False, show_template_help=False, **kwargs)

Submit a sequence of operations to the scheduler.

Changed in version 0.6.

Parameters:
  • operations (A sequence of instances of JobOperation) – The operations to submit.
  • _id (str) – The _id to be used for this submission.
  • serial (bool) – Execute all bundled operations in serial.
  • flags (list) – Additional options to be forwarded to the scheduler.
  • force (bool) – Ignore all warnings or checks during submission, just submit.
  • template (str) – The name of the template file to be used to generate the submission script.
  • pretend (bool) – Do not actually submit, but only print the submission script to screen. Useful for testing the submission workflow.
  • kwargs – Additional keyword arguments to be forwarded to the scheduler.
Returns:

Return the submission status after successful submission or None.

classmethod update_aliases(aliases)

Update the ALIASES table for this class.

update_stati(scheduler, jobs=None, file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, pool=None, ignore_errors=False)

This function has been removed as of version 0.6.

classmethod write_human_readable_statepoint(script, job)

Write statepoint of job in human-readable format to script.

Deprecated since version 0.6: Users should migrate to the new templating system.

write_script(script, operations, background=False, **kwargs)

Write a script for the execution of operations.

Deprecated since version 0.6: Users should migrate to the new templating system.

By default, this function will generate a script with the following components:

write_script_header(script)
write_script_operations(script, operations, background=background)
write_script_footer(script)
Parameters:
  • script – The script to write the commands to.
  • operations (A sequence of JobOperation) – The operations to be written to the script.
  • background (bool) – Whether operations should be executed in the background; useful to parallelize execution.

“Write the script footer for the execution script.

Deprecated since version 0.6: Users should migrate to the new templating system.

write_script_header(script, **kwargs)

“Write the script header for the execution script.

Deprecated since version 0.6: Users should migrate to the new templating system.

write_script_operations(script, operations, background=False, **kwargs)

Write the commands for the execution of operations as part of a script.

Deprecated since version 0.6: Users should migrate to the new templating system.

class flow.JobOperation(name, job, cmd, directives=None, np=None)

Bases: object

This class represents the information needed to execute one operation for one job.

An operation function in this context is a shell command, which should be a function of one and only one signac job.

Note

This class is used by the FlowProject class for the execution and submission process and should not be instantiated by users themselves.

Changed in version 0.6.

Parameters:
  • name (str) – The name of this JobOperation instance. The name is arbitrary, but helps to concisely identify the operation in various contexts.
  • job (signac.Job.) – The job instance associated with this operation.
  • cmd (str) – The command that executes this operation.
  • directives (dict) – A dictionary of additional parameters that provide instructions on how to execute this operation, e.g., specifically required resources.
get_id(index=0)

Return a name, which identifies this job-operation.

get_status()

Retrieve the operation’s last known status.

set_status(value)

Store the operation’s status.

class flow.label(name=None)

Bases: object

Decorate a FlowProject class function as a label function. For example:

class MyProject(FlowProject):

    @label()
    def foo(self, job):
        return True
class flow.classlabel(name=None)

Bases: flow.labels.label

A label decorator for classmethods.

This decorator implies “classmethod”!

class flow.staticlabel(name=None)

Bases: flow.labels.label

A label decorator for staticmethods.

This decorator implies “staticmethod”!

flow.cmd(func)

Specifies that func returns a shell command.

If this function is an operation function defined by FlowProject, it will be interpreted to return a shell command, instead of executing the function itself.

For example:

@FlowProject.operation
@flow.cmd
def hello(job):
    return "echo {job._id}"
class flow.directives(**kwargs)

Bases: object

Decorator for operation functions to provide additional execution directives.

Directives can for example be used to provide information about required resources such as the number of processes required for execution of parallelized operations.

flow.run(parser=None)

Access to the “run” interface of an operations module.

Executing this function within a module will start a command line interface, that can be used to execute operations defined within the same module. All top-level unary functions will be intepreted as executable operation functions.

For example, if we have a module as such:

# operations.py

def hello(job):
    print('hello', job)

if __name__ == '__main__':
    import flow
    flow.run()

Then we can execute the hello operation for all jobs from the command like like this:

$ python operations.py hello

Note

You can control the degree of parallelization with the --np argument.

For more information, see:

$ python operations.py --help
flow.init(alias=None, template=None, root=None, out=None)

Initialize a templated FlowProject module.

flow.redirect_log(job, filename='run.log', formatter=None, logger=None)

Redirect all messages logged via the logging interface to the given file.

Parameters:
  • job (signac.Project.Job) – An instance of a signac job.
  • logger – The instance of logger to which the new file log handler is added. Defaults to the default logger returned by logging.getLogger() if this argument is not provided.
Formatter:

The logging formatter to use, uses a default formatter if this argument is not provided.

type logger:
logging.Logger
flow.get_environment(test=False, import_configured=True)

Attempt to detect the present environment.

This function iterates through all defined ComputeEnvironment classes in reversed order of definition and and returns the first EnvironmentClass where the is_present() method returns True.

Parameters:test (bool) – Return the TestEnvironment
Returns:The detected environment class.