flow API

Module contents

Workflow management based on the signac framework.

The signac-flow package provides the basic infrastructure to easily configure and implement a workflow to operate on a signac data space.

class flow.FlowProject(config=None)

Bases: signac.contrib.project.Project

A signac project class assisting in workflow management.

Parameters:config (A signac config object.) – A signac configuaration, defaults to the configuration loaded from the environment.
add_operation(name, cmd, pre=None, post=None, np=None, **kwargs)

Add an operation to the workflow.

This method will add an instance of FlowOperation to the operations-dict of this project.

Any FlowOperation is associated with a specific command, which should be a function of Job. The command (cmd) can be stated as function, either by using str-substitution based on a job’s attributes, or by providing a unary callable, which expects an instance of job as its first and only positional argument.

For example, if we wanted to define a command for a program called ‘hello’, which expects a job id as its first argument, we could contruct the following two equivalent operations:

op = FlowOperation('hello', cmd='hello {job._id}')
op = FlowOperation('hello', cmd=lambda 'hello {}'.format(job._id))

Here are some more useful examples for str-substitutions:

# Substitute job state point parameters: op = FlowOperation(‘hello’, cmd=’cd {job.ws}; hello {job.sp.a}’)

Pre-requirements (pre) and post-conditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False.

An operation is considered “eligible” for execution when all pre-requirements are met and when at least one of the post-conditions is not met. Requirements are always met when the list of requirements is empty and post-conditions are never met when the list of post-conditions is empty.

Please note, eligibility in this contexts refers only to the workflow pipline and not to other contributing factors, such as whether the job-operation is currently running or queued.

Parameters:
  • name (str) – A unique identifier for this operation, may be freely choosen.
  • cmd (str or callable) – The command to execute operation; should be a function of job.
  • pre (sequence of callables) – required conditions
  • post – post-conditions to determine completion
  • np (int) – Specify the number of processors this operation requires, defaults to 1.
classmethod add_print_status_args(parser)

Add arguments to parser for the print_status() method.

classmethod add_script_args(parser)

Add arguments to parser for the script() method.

classmethod add_submit_args(parser)

Add arguments to parser for the submit() method.

classify(job)

Generator function which yields labels for job.

By default, this method yields from the project’s labels() method.

Parameters:job (Job) – The signac job handle.
Yields:The labels to classify job.
Yield type:str
completed_operations(job)

Determine which operations have been completed for job.

Parameters:job (Job) – The signac job handle.
Returns:The name of the operations that are complete.
Return type:str
eligible(job_operation, **kwargs)

Determine if job is eligible for operation.

Warning

This function is deprecated, please use eligible_for_submission() instead.

eligible_for_submission(job_operation)

Determine if a job-operation is eligible for submission.

By default, an operation is eligible for submission when it is not considered active, that means already queued or running.

export_job_stati(collection, stati)

Export the job stati to a database collection.

format_row(status, statepoint=None, max_width=None)

Format each row in the detailed status output.

get_job_status(job)

Return the detailed status of a job.

labels(job)

Auto-generate labels from label-functions.

This generator function will automatically yield labels, from project methods decorated with the @label decorator.

For example, we can define a function like this:

class MyProject(FlowProject):

    @label()
    def is_foo(self, job):
        return job.document.get('foo', False)

The labels() generator method will now yield a is_foo label whenever the job document has a field foo which evaluates to True.

By default, the label name is equal to the function’s name, but you can specify a custom label as the first argument to the label decorator, e.g.: @label('foo_label').

Tip

In this particular case it may make sense to define the is_foo() method as a staticmethod, since it does not actually depend on the project instance. We can do this by using the @staticlabel() decorator, equivalently the @classlabel() for class methods.

main(parser=None, pool=None)

Call this function to use the main command line interface.

In most cases one would want to call this function as part of the class definition, e.g.:

 my_project.py
from flow import FlowProject

class MyProject(FlowProject):
    pass

if __name__ == '__main__':
    MyProject().main()

You can then execute this script on the command line:

$ python my_project.py --help
map_scheduler_jobs(scheduler_jobs)

Map all scheduler jobs by job id.

This function fetches all scheduled jobs from the scheduler and generates a nested dictionary, where the first key is the job id, the second key the operation name and the last value are the cooresponding scheduler jobs.

For example, to print the status of all scheduler jobs, associated with a specific job operation, execute:

sjobs = project.scheduler_jobs(scheduler)
sjobs_map = project.map_scheduler_jobs(sjobs)
for sjob in sjobs_map[job.get_id()][operation]:
    print(sjob._id(), sjob.status())
Parameters:scheduler_jobs – An iterable of scheduler job instances.
Returns:A nested dictionary (job_id, op_name, scheduler jobs)
next_operation(job)

Determine the next operation for this job.

Parameters:job (Job) – The signac job handle.
Returns:An instance of JobOperation to execute next or None, if no operation is eligible.
Return type:JobOperation or NoneType
next_operations(job)

Determine the next operations for job.

You can, but don’t have to use this function to simplify the submission process. The default method returns yields all operation that a job is eligible for, as defined by the add_operation() method.

Parameters:job (Job) – The signac job handle.
Yield:All instances of JobOperation a job is eligible for.
operations

The dictionary of operations that have been added to the workflow.

print_status(scheduler=None, job_filter=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, skip_active=False, param_max_width=None, file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, err=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, pool=None, ignore_errors=False)

Print the status of the project.

Parameters:
  • scheduler (Scheduler) – The scheduler instance used to fetch the job stati.
  • job_filter – A JSON encoded filter, that all jobs to be submitted need to match.
  • overview (bool) – Aggregate an overview of the project’ status.
  • overview_max_lines (int) – Limit the number of overview lines.
  • detailed (bool) – Print a detailed status of each job.
  • parameters (list of str) – Print the value of the specified parameters.
  • skip_active (bool) – Only print jobs that are currently inactive.
  • param_max_width – Limit the number of characters of parameter columns, see also: update_aliases().
  • file – Redirect all output to this file, defaults to sys.stdout
  • err – Redirect all error output to this file, defaults to sys.stderr
  • pool – A multiprocessing or threading pool. Providing a pool parallelizes this method.
run(operations=None, pretend=False, np=None, timeout=None, progress=False)

Execute the next operations as specified by the project’s workflow.

scheduler_jobs(scheduler)

Fetch jobs from the scheduler.

This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically.

However, this function will not automatically filter scheduler jobs which are not associated with this project.

Parameters:scheduler (Scheduler) – The scheduler instance.
Yields:All scheduler jobs fetched from the scheduler instance.
submit(env, bundle_size=1, serial=False, force=False, nn=None, ppn=None, walltime=None, **kwargs)

Submit function for the project’s main submit interface.

This method gather and optionally bundle all operations which are eligible for execution, prepare a submission script using the write_script() method, and finally attempting to submit these to the scheduler.

The primary advantage of using this method over a manual submission process, is that submit() will keep track of operation submit status (queued/running/completed/etc.) and will automatically prevent the submission of the same operation multiple times if it is considered active (e.g. queued or running).

submit_operations(env, _id, operations, nn=None, ppn=None, serial=False, flags=None, force=False, **kwargs)

Submit a sequence of operations to the scheduler.

classmethod update_aliases(aliases)

Update the ALIASES table for this class.

update_stati(scheduler, jobs=None, file=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, pool=None, ignore_errors=False)

Update the status of all jobs with the given scheduler.

Parameters:
  • scheduler (Scheduler) – The scheduler instance used to feth the job stati.
  • jobs – A sequence of Job instances.
  • file – The file to write output to, defaults to sys.stderr.
write_human_readable_statepoint(script, job)

Write statepoint of job in human-readable format to script.

write_script(script, operations, background=False)

Write a script for the execution of operations.

By default, this function will generate a script with the following components:

write_script_header(script)
write_script_operations(script, operations, background=background)
write_script_footer(script)

Consider overloading any of the methods above, before overloading this method.

Parameters:
  • script – The script to write the commands to.
  • operations (sequence of JobOperation) – The operations to be written to the script.
  • background (bool) – Whether operations should be executed in the background; useful to parallelize execution

Write the script footer for the execution script.

write_script_header(script, **kwargs)

Write the script header for the execution script.

write_script_operations(script, operations, background=False)

Write the commands for the execution of operations as part of a script.

class flow.JobOperation(name, job, cmd, np=None, mpi=False)

Bases: object

Define operations to apply to a job.

An operation function in the context of signac is a function, with only one job argument. This in principle ensures that operations are deterministic in the sense that both input and output only depend on the job’s metadata and data.

This class is designed to define commands to be executed on the command line that constitute an operation.

Note

The command arguments should only depend on the job metadata to ensure deterministic operations.

Parameters:
  • name (str) – The name of this JobOperation instance. The name is arbitrary, but helps to concisely identify the operation in various contexts.
  • job (signac.Job.) – The job instance associated with this operation.
get_id()

Return a name, which identifies this job-operation.

get_status()

Retrieve the operation’s last known status.

set_status(value)

Store the operation’s status.

class flow.label(name=None)

Bases: object

Decorate a function to be a label function.

The label() method as part of FlowProject iterates over all methods decorated with this label and yields the method’s name or the provided name.

For example:

class MyProject(FlowProject):

    @label()
    def foo(self, job):
        return True

    @label()
    def bar(self, job):
        return 'a' in job.statepoint()

>>> for label in MyProject().labels(job):
...     print(label)

The code segment above will always print the label ‘foo’, but the label ‘bar’ only if ‘a’ is part of a job’s state point.

This enables the user to quickly write classification functions and use them for labeling, for example in the classify() method.

class flow.classlabel(name=None)

Bases: flow.project.label

A label decorator for classmethods.

This decorator implies “classmethod”!

class flow.staticlabel(name=None)

Bases: flow.project.label

A label decorator for staticmethods.

This decorator implies “staticmethod”!

flow.get_environment(test=False, import_configured=True)

Attempt to detect the present environment.

This function iterates through all defined ComputeEnvironment classes in reversed order of definition and and returns the first EnvironmentClass where the is_present() method returns True.

Parameters:test – Return the TestEnvironment
Returns:The detected environment class.
flow.run(parser=None)

Access to the “run” interface of an operations module.

Executing this function within a module will start a command line interface, that can be used to execute operations defined within the same module. All top-level unary functions will be intepreted as executable operation functions.

For example, if we have a module as such:

# operations.py

def hello(job):
    print('hello', job)

if __name__ == '__main__':
    import flow
    flow.run()

Then we can execute the hello operation for all jobs from the command like like this:

$ python operations.py hello

Note

The execution of operations is automatically parallelized. You can control the degree of parallelization with the --np argument.

For more information, see:

$ python operations.py --help

flow.scheduler module

flow.environment module

Detection of compute environments.

This module provides the ComputeEnvironment class, which can be subclassed to automatically detect specific computational environments.

This enables the user to adjust their workflow based on the present environment, e.g. for the adjustemt of scheduler submission scripts.

class flow.environment.ComputeEnvironment

Bases: object

Define computational environments.

The ComputeEnvironment class allows us to automatically determine specific environments in order to programatically adjust workflows in different environments.

The default method for the detection of a specific environemnt is to provide a regular expression matching the environment’s hostname. For example, if the hostname is my_server.com, one could identify the environment by setting the hostname_pattern to ‘my_server’.

static bg(cmd)

Wrap a command (cmd) to be executed in the background.

classmethod get_config_value(key, default=None)

Request a value from the user’s configuration.

This method should be used whenever values need to be provided that are specific to a users’s environment. A good example are account names.

When a key is not configured and no default value is provided, a SubmitError will be raised and the user will be prompted to add the missing key to their configuration.

Please note, that the key will be automatically expanded to be specific to this environment definition. For example, a key should be ‘account’, not ‘MyEnvironment.account`.

Parameters:
  • key (str) – The environment specific configuration key.
  • default – A default value in case the key cannot be found within the user’s configuration.
Returns:

The value or default value.

Raises:

SubmitError – If the key is not in the user’s configuration and no default value is provided.

classmethod get_scheduler()

Return a environment specific scheduler driver.

The returned scheduler class provides a standardized interface to different scheduler implementations.

classmethod is_present()

Determine whether this specific compute environment is present.

The default method for environment detection is trying to match a hostname pattern.

classmethod script(**kwargs)

Return a JobScript instance.

Derived ComputeEnvironment classes may require additional arguments for the creation of a job submission script.

classmethod submit(script, flags=None, *args, **kwargs)

Submit a job submission script to the environment’s scheduler.

Scripts should be submitted to the environment, instead of directly to the scheduler to allow for environment specific post-processing.

class flow.environment.ComputeEnvironmentType(name, bases, dct)

Bases: type

Meta class for the definition of ComputeEnvironments.

This meta class automatically registers ComputeEnvironment definitions, which enables the automatic determination of the present environment.

class flow.environment.DefaultSlurmEnvironment

Bases: flow.environment.NodesEnvironment, flow.environment.SlurmEnvironment

A default environment for environments with slurm scheduler.

class flow.environment.DefaultTorqueEnvironment

Bases: flow.environment.NodesEnvironment, flow.environment.TorqueEnvironment

A default environment for environments with TORQUE scheduler.

class flow.environment.JobScript(env)

Bases: _io.StringIO

“Simple StringIO wrapper for the creation of job submission scripts.

Using this class to write a job submission script allows us to use environment specific expressions, for example for MPI commands.

write_cmd(cmd, bg=False, np=None)

Write a command to the jobscript.

This command wrapper function is a convenience function, which adds mpi and other directives whenever necessary.

Parameters:
  • cmd (str) – The command to write to the jobscript.
  • np (int) – The number of processors required for execution.
writeline(line='')

Write one line to the job script.

class flow.environment.MoabEnvironment(*args, **kwargs)

Bases: flow.environment.ComputeEnvironment

“An environment with TORQUE scheduler.

This class is deprecated and only kept for backwards compatibility.

class flow.environment.SlurmEnvironment

Bases: flow.environment.ComputeEnvironment

An environment with slurm scheduler.

class flow.environment.TestEnvironment

Bases: flow.environment.ComputeEnvironment

This is a test environment.

The test environment will print a mocked submission script and submission commands to screen. This enables testing of the job submission script generation in environments without an real scheduler.

class flow.environment.TorqueEnvironment

Bases: flow.environment.ComputeEnvironment

An environment with TORQUE scheduler.

class flow.environment.UnknownEnvironment

Bases: flow.environment.ComputeEnvironment

This is a default environment, which is always present.

flow.environment.format_timedelta(delta)

Format a time delta for interpretation by schedulers.

flow.environment.get_environment(test=False, import_configured=True)

Attempt to detect the present environment.

This function iterates through all defined ComputeEnvironment classes in reversed order of definition and and returns the first EnvironmentClass where the is_present() method returns True.

Parameters:test – Return the TestEnvironment
Returns:The detected environment class.
flow.environment.setup(py_modules, **attrs)

Setup function for environment modules.

Use this function in place of setuptools.setup to not only install a environments module, but also register it with the global signac configuration. Once registered, is automatically imported when the get_environment() function is called.

flow.environments module

The environments module contains additional opt-in environment profiles.

Add the following line to your project modules, to use these profiles:

import flow.environments

flow.manage module

class flow.manage.JobStatus

Bases: enum.IntEnum

Classifies the job’s execution status.

The stati are ordered by the significance of the execution status. This enables easy comparison, such as

which prevents a submission of a job, which is already submitted, queued, active or in an error state.

class flow.manage.Scheduler(header=None, cores_per_node=None, *args, **kwargs)

Bases: object

Generic Scheduler ABC

jobs()

yields ClusterJob

flow.manage.submit(env, project, state_point, script, identifier='default', force=False, pretend=False, *args, **kwargs)

Attempt to submit a job to the scheduler of the current environment.

The job status will be determined from the job’s status document. If the job’s status is greater or equal than JobStatus.submitted, the job will not be submitted, unless the force option is provided.

flow.manage.update_status(job, scheduler_jobs=None)

Update the job’s status dictionary.

flow.errors module

exception flow.errors.NoSchedulerError

Bases: AttributeError

Indicates that there is no scheduler type defined for an environment class.

exception flow.errors.SubmitError

Bases: RuntimeError

Indicates an error during cluster job submission.

flow.fakescheduler module

flow.torque module

Routines for the MOAB environment.

flow.slurm module

Routines for the SLURM environment.