API Reference¶
This is the API for the signac-flow application.
Command Line Interface¶
Some core signac-flow functions are—in addition to the Python interface—accessible
directly via the $ flow
command.
For more information, please see $ flow --help
.
usage: flow [-h] [--debug] [--version] {init} ...
flow provides the basic components to set up workflows for projects as part of
the signac framework.
positional arguments:
{init}
init Initialize a signac-flow project.
optional arguments:
-h, --help show this help message and exit
--debug Show traceback on error for debugging.
--version Display the version number and exit.
The FlowProject¶
Attributes
FlowProject.ALIASES |
Default aliases used within the status output. |
FlowProject.completed_operations (job) |
Determine which operations have been completed for job. |
FlowProject.get_job_status (job[, …]) |
Return status information about a job. |
FlowProject.label ([label_name_or_func]) |
Designate a function as a label function for this class. |
FlowProject.labels (job) |
Yield all labels for the given job . |
FlowProject.main ([parser]) |
Call this function to use the main command line interface. |
FlowProject.make_group (name[, options]) |
Make a FlowGroup named name and return a decorator to make groups. |
FlowProject.operation (func[, name]) |
Add an operation function to the class workflow definition. |
FlowProject.operations |
Get the dictionary of operations that have been added to the workflow. |
FlowProject.post (condition[, tag]) |
Define and evaluate postconditions for operations. |
FlowProject.post.copy_from (*other_funcs) |
Copy postconditions from other operation(s). |
FlowProject.post.false (key) |
Evaluate if a document key is False for the job(s). |
FlowProject.post.isfile (filename) |
Determine if the specified file exists for the job(s). |
FlowProject.post.never (func) |
Return False. |
FlowProject.post.not_ (condition) |
Return not condition(*jobs) for the provided condition function. |
FlowProject.post.true (key) |
Evaluate if a document key is True for the job(s). |
FlowProject.pre (condition[, tag]) |
Define and evaluate preconditions for operations. |
FlowProject.pre.after (*other_funcs) |
Precondition to run an operation after other operations. |
FlowProject.pre.copy_from (*other_funcs) |
Copy preconditions from other operation(s). |
FlowProject.pre.false (key) |
Evaluate if a document key is False for the job(s). |
FlowProject.pre.isfile (filename) |
Determine if the specified file exists for the job(s). |
FlowProject.pre.never (func) |
Return False. |
FlowProject.pre.not_ (condition) |
Return not condition(*jobs) for the provided condition function. |
FlowProject.pre.true (key) |
Evaluate if a document key is True for the job(s). |
FlowProject.run ([jobs, names, pretend, np, …]) |
Execute all eligible operations for the given selection. |
FlowProject.scheduler_jobs (scheduler) |
Fetch jobs from the scheduler. |
FlowProject.submit ([bundle_size, jobs, …]) |
Submit function for the project’s main submit interface. |
-
class
flow.
FlowProject
(config=None, environment=None, entrypoint=None)[source]¶ Bases:
signac.contrib.project.Project
A signac project class specialized for workflow management.
This class is used to define, execute, and submit workflows based on operations and conditions.
Users typically interact with this class through its command line interface.
This is a typical example of how to use this class:
@FlowProject.operation def hello(job): print('hello', job) FlowProject().main()
Parameters: - config (
signac.contrib.project._ProjectConfig
) – A signac configuration, defaults to the configuration loaded from the current directory. - environment (
flow.environment.ComputeEnvironment
) – An environment to use for scheduler submission. IfNone
, the environment is automatically identified. The default isNone
. - entrypoint (dict) – A dictionary with two possible keys:
'executable'
and'path'
. The path represents the location of the script file (the script file must callFlowProject.main()
). The executable represents the location of the Python interpreter used for the execution ofBaseFlowOperation
that are Python functions.
-
ALIASES
= {'active': 'A', 'error': 'E', 'held': 'H', 'inactive': 'I', 'queued': 'Q', 'registered': 'R', 'submitted': 'S', 'unknown': 'U'}¶ Default aliases used within the status output.
-
PRINT_STATUS_ALL_VARYING_PARAMETERS
= True¶ This constant can be used to signal that the print_status() method is supposed to automatically show all varying parameters.
-
add_operation
(**kwargs)[source]¶ Deprecated since version 0.14: This will be removed in 0.16. Method has been removed.
-
completed_operations
(job)[source]¶ Determine which operations have been completed for job.
Parameters: job ( Job
) – The signac job handle.Yields: str – The names of the operations that are complete.
-
detect_operation_graph
()[source]¶ Determine the directed acyclic graph given by operation conditions.
In general, executing a given operation registered with a FlowProject just involves checking the operation’s preconditions and postconditions to determine eligibility. More generally, however, the preconditions and postconditions define a directed acyclic graph that governs the execution of all operations. Visualizing this graph can be useful for finding logic errors in the specified conditions, and having this graph computed also enables additional execution modes. For example, using this graph it is possible to determine exactly what operations need to be executed in order to make the operation eligible so that the task of executing all necessary operations can be automated.
The graph is determined by iterating over all pairs of operations and checking for equality of preconditions and postconditions. The algorithm builds an adjacency matrix based on whether the preconditions for one operation match the postconditions for another. The comparison of operations is conservative; by default, conditions must be composed of identical code to be identified as equal (technically, they must have equivalent bytecode, i.e.
cond1.__code__.co_code == cond2.__code__.co_code
). Users can specify that conditions should be treated as equal by providing tags to the operations.Given a
FlowProject
subclass defined in a moduleproject.py
, the output graph could be visualized using Matplotlib and NetworkX with the following code:import numpy as np import networkx as nx from matplotlib import pyplot as plt from project import Project project = Project() ops = project.operations.keys() adj = np.asarray(project.detect_operation_graph()) plt.figure() g = nx.DiGraph(adj) pos = nx.spring_layout(g) nx.draw(g, pos) nx.draw_networkx_labels( g, pos, labels={key: name for (key, name) in zip(range(len(ops)), [o for o in ops])}) plt.show()
Returns: The adjacency matrix of operation dependencies. A zero indicates no dependency, and a 1 indicates dependency. This can be converted to a graph using NetworkX. Return type: list of lists of int Raises: RuntimeError
– If a condition does not have a tag. This can occur when usingfunctools.partial
, and a manually specified condition tag has not been set.
-
get_job_status
(job, ignore_errors=False, cached_status=None)[source]¶ Return status information about a job.
Parameters: - job (
Job
) – The signac job. - ignore_errors (bool) – Whether to ignore exceptions raised during status check. (Default value = False)
- cached_status (dict) – Dictionary of cached status information. The keys are uniquely
generated ids for each group and job. The values are instances of
JobStatus
. (Default value = None)
Returns: A dictionary containing job status for all jobs.
Return type: - job (
-
groups
¶ Get the dictionary of groups that have been added to the workflow.
-
classmethod
label
(label_name_or_func=None)[source]¶ Designate a function as a label function for this class.
For example, we can define a label function like this:
@FlowProject.label def foo_label(job): if job.document.get('foo', False): return 'foo-label-text'
The
foo-label-text
label will now show up in the status view for each job, where thefoo
key evaluates true.If the label functions returns any type other than
str
, the label name will be the name of the function if and only if the return value evaluates toTrue
, for example:@FlowProject.label def foo_label(job): return job.document.get('foo', False)
Finally, specify a label name by providing it as the first argument to the
label()
decorator.Parameters: label_name_or_func (str or callable) – A label name or callable. (Default value = None) Returns: A decorator for the label function. Return type: callable
-
labels
(job)[source]¶ Yield all labels for the given
job
.See also:
label()
.Parameters: job ( signac.contrib.job.Job
) – Job handle.Yields: str – Label value.
-
main
(parser=None)[source]¶ Call this function to use the main command line interface.
In most cases one would want to call this function as part of the class definition:
# my_project.py from flow import FlowProject class MyProject(FlowProject): pass if __name__ == '__main__': MyProject().main()
The project can then be executed on the command line:
$ python my_project.py --help
Parameters: parser ( argparse.ArgumentParser
) – The argument parser used to implement the command line interface. If None, a new parser is constructed. (Default value = None)
-
classmethod
make_group
(name, options='')[source]¶ Make a
FlowGroup
namedname
and return a decorator to make groups.A
FlowGroup
is used to group operations together for running and submitting_JobOperation
s.Examples
The code below creates a group and adds an operation to that group.
example_group = FlowProject.make_group('example') @example_group @FlowProject.operation def foo(job): return "hello world"
Parameters: Returns: The created group.
Return type:
-
classmethod
operation
(func, name=None)[source]¶ Add an operation function to the class workflow definition.
This function is designed to be used as a decorator, for example:
@FlowProject.operation def hello(job): print('Hello', job)
Parameters: - func (callable) – The function to add to the workflow.
- name (str) –
- The operation name. Uses the name of the function if None.
- (Default value = None)
Returns: The operation function.
Return type: callable
-
operations
¶ Get the dictionary of operations that have been added to the workflow.
-
print_status
(jobs=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, param_max_width=None, expand=False, all_ops=False, only_incomplete=False, dump_json=False, unroll=True, compact=False, pretty=False, file=None, err=None, ignore_errors=False, template=None, profile=False, eligible_jobs_max_lines=None, output_format='terminal')[source]¶ Print the status of the project.
Parameters: - jobs (iterable of
Job
) – Only print status for the given jobs, or all if the argument is None. (Default value = None) - overview (bool) – Display an overview of the project status. (Default value = True)
- overview_max_lines (int) – Limit the number of overview lines. (Default value = None)
- detailed (bool) – Print a detailed status of each job. (Default value = False)
- parameters (list of str) – Print the value of the specified parameters. (Default value = None)
- param_max_width (int) – Limit the number of characters of parameter columns. (Default value = None)
- expand (bool) – Present labels and operations in two separate tables. (Default value = False)
- all_ops (bool) – Include operations that are not eligible to run. (Default value = False)
- only_incomplete (bool) – Only show jobs that have eligible operations. (Default value = False)
- dump_json (bool) – Output the data as JSON instead of printing the formatted output. (Default value = False)
- unroll (bool) – Separate columns for jobs and the corresponding operations. (Default value = True)
- compact (bool) – Print a compact version of the output. (Default value = False)
- pretty (bool) – Prettify the output. (Default value = False)
- file (str) – Redirect all output to this file, defaults to sys.stdout.
- err (str) – Redirect all error output to this file, defaults to sys.stderr.
- ignore_errors (bool) – Print status even if querying the scheduler fails. (Default value = False)
- template (str) – User provided Jinja2 template file. (Default value = None)
- profile (bool) – Show profile result. (Default value = False)
- eligible_jobs_max_lines (int) – Limit the number of operations and its eligible job count printed in the overview. (Default value = None)
- output_format (str) – Status output format, supports: ‘terminal’ (default), ‘markdown’ or ‘html’.
- jobs (iterable of
-
run
(jobs=None, names=None, pretend=False, np=None, timeout=None, num=None, num_passes=1, progress=False, order=None, ignore_conditions=<IgnoreConditions.NONE: 0>)[source]¶ Execute all eligible operations for the given selection.
This function will run in an infinite loop until all eligible operations are executed, unless it reaches the maximum number of passes per operation or the maximum number of executions.
By default there is no limit on the total number of executions, but a specific operation will only be executed once per job. This is to avoid accidental infinite loops when no or faulty postconditions are provided.
Parameters: - jobs (iterable of
Job
) – Only execute operations for the given jobs, or all if the argument is None. (Default value = None) - names (iterable of
str
) – Only execute operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) - pretend (bool) – Do not actually execute the operations, but show the commands that would have been executed. (Default value = False)
- np (int) – Parallelize to the specified number of processors. Use -1 to parallelize to all available processing units. (Default value = None)
- timeout (int) – An optional timeout for each operation in seconds after which execution will be cancelled. Use -1 to indicate no timeout (the default).
- num (int) – The total number of operations that are executed will not exceed this argument if provided. (Default value = None)
- num_passes (int or None) – The total number of executions of one specific job-operation pair will not exceed this argument. The default is 1, there is no limit if this argument is None.
- progress (bool) – Show a progress bar during execution. (Default value = False)
- order (str, callable, or None) –
Specify the order of operations. Possible values are:
- ’none’ or None (no specific order)
- ’by-job’ (operations are grouped by job)
- ’cyclic’ (order operations cyclic by job)
- ’random’ (shuffle the execution order randomly)
- callable (a callable returning a comparison key for an operation used to sort operations)
The default value is
'none'
, which is equivalent to'by-job'
in the current implementation.Note
Users are advised to not rely on a specific execution order as a substitute for defining the workflow in terms of preconditions and postconditions. However, a specific execution order may be more performant in cases where operations need to access and potentially lock shared resources.
- ignore_conditions (
IgnoreConditions
) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default isIgnoreConditions.NONE
.
- jobs (iterable of
-
scheduler_jobs
(scheduler)[source]¶ Fetch jobs from the scheduler.
This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically.
However, this function will not automatically filter scheduler jobs which are not associated with this project.
Parameters: scheduler ( Scheduler
) – The scheduler instance.Yields: ClusterJob
– All cluster jobs fetched from the scheduler.
-
submit
(bundle_size=1, jobs=None, names=None, num=None, parallel=False, force=False, ignore_conditions=<IgnoreConditions.NONE: 0>, ignore_conditions_on_execution=<IgnoreConditions.NONE: 0>, **kwargs)[source]¶ Submit function for the project’s main submit interface.
Parameters: - bundle_size (int) – Specify the number of operations to be bundled into one submission, defaults to 1.
- jobs (iterable of
Job
) – Only submit operations for the given jobs, or all if the argument is None. (Default value = None) - names (iterable of
str
) – Only submit operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None) - num (int) – Limit the total number of submitted operations, defaults to no limit.
- parallel (bool) – Execute all bundled operations in parallel. (Default value = False)
- force (bool) – Ignore all warnings or checks during submission, just submit. (Default value = False)
- ignore_conditions (
IgnoreConditions
) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default isIgnoreConditions.NONE
. - ignore_conditions_on_execution (
IgnoreConditions
) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility after submitting. The default isIgnoreConditions.NONE
. - **kwargs – Additional keyword arguments forwarded to
submit()
.
- config (
-
FlowProject.
post
(tag=None)¶ Define and evaluate postconditions for operations.
A postcondition is a function accepting one or more jobs as positional arguments (
*jobs
) that must evaluate to True for this operation to be considered complete. For example:@Project.operation @Project.post(lambda job: job.doc.get('bye')) def bye(job): print('bye', job) job.doc.bye = True
The bye operation would be considered complete and therefore no longer eligible for execution once the ‘bye’ key in the job document evaluates to True.
An optional tag may be associated with the condition. These tags are used by
detect_operation_graph()
when comparing conditions for equality. The tag defaults to the bytecode of the function.
-
classmethod
post.
copy_from
(*other_funcs)¶ Copy postconditions from other operation(s).
True if and only if all postconditions of other operation function(s) are met.
-
classmethod
post.
false
(key)¶ Evaluate if a document key is False for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to False.
-
classmethod
post.
isfile
(filename)¶ Determine if the specified file exists for the job(s).
-
classmethod
post.
never
(func)¶ Return False.
-
classmethod
post.
not_
(condition)¶ Return
not condition(*jobs)
for the provided condition function.
-
classmethod
post.
true
(key)¶ Evaluate if a document key is True for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to True.
-
FlowProject.
pre
(tag=None)¶ Define and evaluate preconditions for operations.
A precondition is a function accepting one or more jobs as positional arguments (
*jobs
) that must evaluate to True for this operation to be eligible for execution. For example:@Project.operation @Project.pre(lambda job: not job.doc.get('hello')) def hello(job): print('hello', job) job.doc.hello = True
The hello operation would only execute if the ‘hello’ key in the job document does not evaluate to True.
An optional tag may be associated with the condition. These tags are used by
detect_operation_graph()
when comparing conditions for equality. The tag defaults to the bytecode of the function.
-
classmethod
pre.
after
(*other_funcs)¶ Precondition to run an operation after other operations.
True if and only if all postconditions of other operation function(s) are met.
-
classmethod
pre.
copy_from
(*other_funcs)¶ Copy preconditions from other operation(s).
True if and only if all preconditions of other operation function(s) are met.
-
classmethod
pre.
false
(key)¶ Evaluate if a document key is False for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to False.
-
classmethod
pre.
isfile
(filename)¶ Determine if the specified file exists for the job(s).
-
classmethod
pre.
never
(func)¶ Return False.
-
classmethod
pre.
not_
(condition)¶ Return
not condition(*jobs)
for the provided condition function.
-
classmethod
pre.
true
(key)¶ Evaluate if a document key is True for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to True.
Operations and Status¶
-
class
flow.project.
BaseFlowOperation
(pre=None, post=None)[source]¶ Bases:
object
A
BaseFlowOperation
represents a data space operation acting on any job.Every
BaseFlowOperation
is associated with a specific command.Preconditions (pre) and postconditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False.
An operation is considered “eligible” for execution when all preconditions are met and when at least one of the postconditions is not met. Preconditions are always met when the list of preconditions is empty. Postconditions are never met when the list of postconditions is empty.
Note
This class should not be instantiated directly.
Parameters: - pre (sequence of callables) – List of preconditions.
- post (sequence of callables) – List of postconditions.
-
class
flow.project.
FlowOperation
(op_func, pre=None, post=None)[source]¶ Bases:
flow.project.BaseFlowOperation
An operation that executes a Python function.
All operations without the
@cmd
directive use this class. The callableop_func
should be a function of one or more instances ofJob
.Note
This class should not be instantiated directly.
Parameters: - op_func (callable) – A callable function of
*jobs
. - pre (sequence of callables) – List of preconditions.
- post (sequence of callables) – List of postconditions.
- op_func (callable) – A callable function of
-
class
flow.project.
FlowCmdOperation
(cmd, pre=None, post=None)[source]¶ Bases:
flow.project.BaseFlowOperation
An operation that executes a shell command.
When an operation has the
@cmd
directive specified, it is instantiated as aFlowCmdOperation
. The operation should be a function of one or more positional arguments that are instances ofJob
. The command (cmd) may either be a callable that expects one or more instances ofJob
as positional arguments and returns a string containing valid shell commands, or the string of commands itself. In either case, the resulting string may contain any attributes of the job (or jobs) placed in curly braces, which will then be substituted by Python string formatting.Note
This class should not be instantiated directly.
Parameters: - cmd (str or callable) – The command to execute the operation. Callable values will be
provided one or more positional arguments (
*jobs
) that are instances ofJob
. String values will be formatted withcmd.format(jobs=jobs)
wherejobs
is a tuple ofJob
, orcmd.format(jobs=jobs, job=job)
if only one job is provided. - pre (sequence of callables) – List of preconditions.
- post (sequence of callables) – List of postconditions.
- cmd (str or callable) – The command to execute the operation. Callable values will be
provided one or more positional arguments (
Labels¶
-
class
flow.
label
(name=None)[source]¶ Bases:
object
Decorate a
FlowProject
class function as a label function.For example:
class MyProject(FlowProject): @label() def foo(self, job): return True
-
class
flow.
staticlabel
(name=None)[source]¶ Bases:
flow.labels.label
A label decorator for staticmethods.
This decorator implies “staticmethod”!
@flow.cmd¶
-
flow.
cmd
(func)[source]¶ Indicate that
func
returns a shell command with this decorator.If this function is an operation function defined by
FlowProject
, it will be interpreted to return a shell command, instead of executing the function itself.For example:
@FlowProject.operation @flow.cmd def hello(job): return "echo {job.id}"
@flow.with_job¶
-
flow.
with_job
(func)[source]¶ Use
arg
as a context manager forfunc(arg)
with this decorator.If this function is an operation function defined by
FlowProject
, it will be the same as usingwith job:
.For example:
@FlowProject.operation @flow.with_job def hello(job): print("hello {}".format(job))
Is equivalent to:
@FlowProject.operation def hello(job): with job: print("hello {}".format(job))
This also works with the @cmd decorator:
@FlowProject.operation @with_job @cmd def hello(job): return "echo 'hello {}'".format(job)
Is equivalent to:
@FlowProject.operation @cmd def hello_cmd(job): return 'trap "cd `pwd`" EXIT && cd {} && echo "hello {job}"'.format(job.ws)
@flow.directives¶
-
class
flow.
directives
(**kwargs)[source]¶ Bases:
object
Decorator for operation functions to provide additional execution directives.
Directives can for example be used to provide information about required resources such as the number of processes required for execution of parallelized operations. For more information, read about Submission Directives.
Supported Directives:
fork
The fork directive can be set to True to enforce that a particular operation is always executed within a subprocess and not within the Python interpreter’s process even if there are no other reasons that would prevent that.
Note
Setting
fork=False
will not prevent forking if there are other reasons for forking, such as a timeout.np
The number of tasks to launch for a given operation i.e., the number of CPU cores to be requested for a given operation.
Expects a natural number (i.e. an integer >= 1). This directive introspects into the “nranks” or “omp_num_threads” directives and uses their product if it is greater than the current set value. Defaults to 1.
ngpu
The number of GPUs to use for this operation.
Expects a nonnegative integer. Defaults to 0.
nranks
The number of MPI ranks to use for this operation. Defaults to 0.
Expects a nonnegative integer.
omp_num_threads
The number of OpenMP threads to use for this operation. Defaults to 0.
Expects a nonnegative integer.
executable
Return the path to the executable to be used for an operation.
The executable directive expects a string pointing to a valid executable file in the current file system.
When called, by default this should point to a Python executable (interpreter); however, if the
FlowProject
path is an empty string, the executable can be a path to an executable Python script. Defaults tosys.executable
.walltime
The number of hours to request for executing this job.
This directive expects a float representing the walltime in hours. Fractional values are supported. For example, a value of 0.5 will request 30 minutes of walltime. If no walltimes are requested, the submission will not specify a walltime in the output script. Some schedulers have a default value that will be used.
For example:
@Project.operation @directives(walltime=24) def op(job): # This operation takes 1 day to run pass
processor_fraction
Fraction of a resource to use on a single operation.
If set to 0.5 for a bundled job with 20 operations (all with ‘np’ set to 1), 10 CPUs will be used. Defaults to 1.
Note
This can be particularly useful on Stampede2’s launcher.
memory
The memory to request for this operation.
The memory to validate should be either a float, int, or string. A valid memory argument is defined as:
- Positive numeric value with suffix “g” or “G” indicating memory requested in gigabytes.
For example:
@Project.operation @directives(memory="4g") def op(job): pass
- Positive numeric value with suffix “m” or “M” indicating memory requested in megabytes.
For example:
@Project.operation @directives(memory="512m") def op(job): pass
- Positive numeric value with no suffix indicating memory requested in gigabytes.
For example:
@Project.operation @directives(memory="4") def op1(job): pass @Project.operation @directives(memory=4) def op2(job): pass
flow.init()¶
-
flow.
init
(alias=None, template=None, root=None)[source]¶ Initialize a templated
FlowProject
module.Parameters: - alias (str) – Python identifier used as a file name for the template output. Uses
"project"
if None. (Default value = None) - template (str) –
Name of the template to use. Built-in templates are:
"minimal"
"example"
"testing"
Uses
"minimal"
if None. (Default value = None) - root (str) – Directory where the output file is placed. Uses the current working directory if None. (Default value = None)
Returns: List of file names created.
Return type: - alias (str) – Python identifier used as a file name for the template output. Uses
flow.get_environment()¶
-
flow.
get_environment
(test=False, import_configured=True)[source]¶ Attempt to detect the present environment.
This function iterates through all defined
ComputeEnvironment
classes in reversed order of definition and returns the first environment where theis_present()
method returns True.Parameters: Returns: The detected environment class.
Return type:
The FlowGroup¶
-
class
flow.project.
FlowGroup
(name, operations=None, operation_directives=None, options='')[source]¶ Bases:
object
A
FlowGroup
represents a subset of a workflow for a project.A
FlowGroup
is associated with one or more instances ofBaseFlowOperation
.Examples
In the example below, the directives will be
{'nranks': 4}
for op1 and{'nranks': 2, 'executable': 'python3'}
for op2.group = FlowProject.make_group(name='example_group') @group.with_directives(nranks=4) @FlowProject.operation @directives(nranks=2, executable="python3") def op1(job): pass @group @FlowProject.operation @directives(nranks=2, executable="python3") def op2(job): pass
Parameters: - name (str) – The name of the group to be used when calling from the command line.
- operations (dict) – A dictionary of operations where the keys are operation names and
each value is a
BaseFlowOperation
. - operation_directives (dict) – A dictionary of additional parameters that provide instructions on how to execute a particular operation, e.g., specifically required resources. Operation names are keys and the dictionaries of directives are values. If an operation does not have directives specified, then the directives of the singleton group containing that operation are used. To prevent this, set the directives to an empty dictionary for that operation.
- options (str) – A string of options to append to the output of the object’s call method.
This allows options like
--num_passes
to be given to a group.
-
add_operation
(name, operation, directives=None)[source]¶ Add an operation to the
FlowGroup
.Parameters: - name (str) – The name of the operation.
- operation (
BaseFlowOperation
) – The workflow operation to add to theFlowGroup
. - directives (dict) – The operation specific directives. (Default value = None)
-
class
flow.project.
FlowGroupEntry
(name, options='')[source]¶ Bases:
object
A FlowGroupEntry registers operations for inclusion into a
FlowGroup
.Application developers should not directly instantiate this class, but use
make_group()
instead.Operation functions can be marked for inclusion into a
FlowGroup
by decorating the functions with a correspondingFlowGroupEntry
. If the operation requires specific directives,with_directives()
accepts keyword arguments that are mapped to directives and returns a decorator that can be applied to the operation to mark it for inclusion in the group and indicate that it should be executed using the specified directives. This overrides the default directives specified byflow.directives()
.Parameters: -
__call__
(func)[source]¶ Add the function into the group’s operations.
This call operator allows the class to be used as a decorator.
Parameters: func (callable) – The function to decorate. Returns: The decorated function. Return type: callable
-
with_directives
(directives)[source]¶ Return a decorator that sets group specific directives to the operation.
Parameters: directives (dict) – Directives to use for resource requests and running the operation through the group. Returns: A decorator which registers the function into the group with specified directives. Return type: function
-
Compute Environments¶
Detection of compute environments.
This module provides the ComputeEnvironment
class, which can be
subclassed to automatically detect specific computational environments.
This enables the user to adjust their workflow based on the present environment, e.g. for the adjustment of scheduler submission scripts.
-
flow.environment.
setup
(py_modules, **attrs)[source]¶ Set up user-defined environment modules.
Use this function in place of
setuptools.setup()
to not only install an environment’s module, but also register it with the global signac configuration. Once registered, the environment is automatically imported when theget_environment()
function is called.
-
flow.environment.
template_filter
(func)[source]¶ Decorate a function as a
ComputeEnvironment
template filter.This decorator is applied to methods defined in a subclass of
ComputeEnvironment
that are used in that environment’s templates. The decorated function becomes a class method that is available as a jinja2 filter in templates rendered by aFlowProject
with thatComputeEnvironment
.Parameters: func (callable) – Function to decorate. Returns: Decorated function. Return type: callable
-
class
flow.environment.
ComputeEnvironment
[source]¶ Bases:
object
Define computational environments.
The ComputeEnvironment class allows us to automatically determine specific environments in order to programmatically adjust workflows in different environments.
The default method for the detection of a specific environment is to provide a regular expression matching the environment’s hostname. For example, if the hostname is
my-server.com
, one could identify the environment by setting thehostname_pattern
to'my-server'
.-
classmethod
add_args
(parser)[source]¶ Add arguments related to this compute environment to an argument parser.
Parameters: parser ( argparse.ArgumentParser
) – The argument parser where arguments will be added.
-
classmethod
get_config_value
(key, default=<flow.util.config._GetConfigValueNoneType object>)[source]¶ Request a value from the user’s configuration.
This method should be used whenever values need to be provided that are specific to a user’s environment, e.g. account names.
When a key is not configured and no default value is provided, a
SubmitError
will be raised and the user will be prompted to add the missing key to their configuration.Please note, that the key will be automatically expanded to be specific to this environment definition. For example, a key should be
'account'
, not'MyEnvironment.account'
.Parameters: Returns: The value or default value.
Return type: Raises: SubmitError
– If the key is not in the user’s configuration and no default value is provided.
-
classmethod
get_prefix
(operation, parallel=False, mpi_prefix=None, cmd_prefix=None)[source]¶ Template filter generating a command prefix from directives.
Parameters: - operation (
flow.project._JobOperation
) – The operation to be prefixed. - parallel (bool) – If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the maximum number of tasks. Default is set to False.
- mpi_prefix (str) – User defined mpi_prefix string. Default is set to None. This will be deprecated and removed in the future.
- cmd_prefix (str) – User defined cmd_prefix string. Default is set to None. This will be deprecated and removed in the future.
Returns: The prefix to be added to the operation’s command.
Return type: - operation (
-
classmethod
get_scheduler
()[source]¶ Return an environment-specific scheduler driver.
The returned scheduler class provides a standardized interface to different scheduler implementations.
-
classmethod
is_present
()[source]¶ Determine whether this specific compute environment is present.
The default method for environment detection is trying to match a hostname pattern or delegate the environment check to the associated scheduler type.
-
classmethod
submit
(script, flags=None, *args, **kwargs)[source]¶ Submit a job submission script to the environment’s scheduler.
Scripts should be submitted to the environment, instead of directly to the scheduler to allow for environment specific post-processing.
Parameters: Returns: Status of job, if submitted.
Return type: JobStatus.submitted or None
-
classmethod
-
class
flow.environment.
StandardEnvironment
[source]¶ Bases:
flow.environment.ComputeEnvironment
Default environment which is always present.
-
class
flow.environment.
NodesEnvironment
[source]¶ A compute environment consisting of multiple compute nodes.
Each compute node is assumed to have a specific number of compute units, e.g., CPUs.
Deprecated since version 0.14: This will be removed in 0.15. NodesEnvironment has been deprecated.
-
class
flow.environment.
TestEnvironment
[source]¶ Bases:
flow.environment.ComputeEnvironment
Environment used for testing.
The test environment will print a mocked submission script and submission commands to screen. This enables testing of the job submission script generation in environments without a real scheduler.
-
scheduler_type
¶
-
-
class
flow.environment.
SimpleSchedulerEnvironment
[source]¶ Bases:
flow.environment.ComputeEnvironment
An environment for the simple-scheduler scheduler.
-
scheduler_type
¶
-
-
class
flow.environment.
PBSEnvironment
[source]¶ An environment with PBS scheduler.
Deprecated since version 0.14: This will be removed in 0.15. PBSEnvironment has been deprecated, instead use DefaultPBSEnvironment
-
class
flow.environment.
SlurmEnvironment
[source]¶ An environment with SLURM scheduler.
Deprecated since version 0.14: This will be removed in 0.15. SlurmEnvironment has been deprecated, instead use DefaultSlurmEnvironment
-
class
flow.environment.
LSFEnvironment
[source]¶ An environment with LSF scheduler.
Deprecated since version 0.14: This will be removed in 0.15. LSFEnvironment has been deprecated, instead use DefaultLSFEnvironment
-
flow.environment.
registered_environments
(import_configured=True)[source]¶ Return a list of registered environments.
Parameters: import_configured (bool) – Whether to import environments specified in the flow configuration. (Default value = True) Returns: List of registered environments. Return type: list
Schedulers¶
Defines the API for the scheduling system.
-
class
flow.scheduling.base.
JobStatus
[source]¶ Bases:
enum.IntEnum
Classifies the job’s execution status.
-
active
= 7¶ The cluster job is actively running.
-
error
= 8¶ The cluster job is in an error or failed state.
-
held
= 5¶ The cluster job is held.
-
inactive
= 3¶ The cluster job is inactive.
This includes states like completed, cancelled, or timed out.
-
placeholder
= 127¶ A placeholder state that is used for status rendering when no operations are eligible.
-
queued
= 6¶ The cluster job is queued.
-
registered
= 2¶ The cluster job is registered with the scheduler, but no other status is known.
-
submitted
= 4¶ The cluster job has been submitted.
Note that this state is never returned by a scheduler, but is an assumed state immediately after a cluster job is submitted.
-
unknown
= 1¶ Unknown cluster job status.
-
user
= 128¶ All user-defined states must be >=128 in value.
-
-
class
flow.scheduling.base.
ClusterJob
(job_id, status=None)[source]¶ Bases:
object
Class representing a cluster job.
-
class
flow.scheduling.base.
Scheduler
[source]¶ Bases:
abc.ABC
Abstract base class for schedulers.
-
jobs
()[source]¶ Yield all cluster jobs.
Yields: ClusterJob
– Cluster job.
-
-
class
flow.scheduling.fake_scheduler.
FakeScheduler
[source]¶ Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract
Scheduler
class for a fake scheduler.This scheduler does not actually schedule (or execute) any jobs, but it can be used to test the submission workflow.
-
classmethod
is_present
()[source]¶ Return False.
The
FakeScheduler
is never present unless manually specified.
-
classmethod
-
class
flow.scheduling.lsf.
LSFScheduler
(user=None)[source]¶ Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for LSF schedulers.
This class can submit cluster jobs to a LSF scheduler and query their current status.
Parameters: - user (str) – Limit the status information to cluster jobs submitted by user.
- **kwargs – Forwarded to the parent constructor.
-
submit
(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]¶ Submit a job script for execution to the scheduler.
Parameters: - script (str) – The job script submitted for execution.
- after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
- hold (bool) – Whether to hold the job upon submission. (Default value = False)
- pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
- flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
- **kwargs – Additional keyword arguments (ignored).
Returns: True if the submission command succeeds (or in pretend mode).
Return type: Raises: SubmitError
– If the submission command fails.
-
class
flow.scheduling.pbs.
PBSScheduler
(user=None)[source]¶ Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for PBS schedulers.
This class can submit cluster jobs to a PBS scheduler and query their current status.
Parameters: - user (str) – Limit the status information to cluster jobs submitted by user.
- **kwargs – Forwarded to the parent constructor.
-
submit
(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]¶ Submit a job script for execution to the scheduler.
Parameters: - script (str) – The job script submitted for execution.
- after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
- hold (bool) – Whether to hold the job upon submission. (Default value = False)
- pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
- flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
- **kwargs – Additional keyword arguments (ignored).
Returns: True if the submission command succeeds (or in pretend mode).
Return type: Raises: SubmitError
– If the submission command fails.
-
class
flow.scheduling.simple_scheduler.
SimpleScheduler
[source]¶ Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for the bundled
simple-scheduler
.The package signac-flow includes a script in
bin/simple-scheduler
that is a simple model of a cluster job scheduler. Thesimple-scheduler
script is designed primarily for testing and demonstration.This class can submit cluster jobs to the built-in simple scheduler and query their current status.
-
submit
(script, *, pretend=False, **kwargs)[source]¶ Submit a job script for execution to the scheduler.
Parameters: - script (str) – The job script submitted for execution.
- pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
- **kwargs – Keyword arguments (ignored).
Returns: True if the submission command succeeds (or in pretend mode).
Return type: Raises: SubmitError
– If the submission command fails.
-
-
class
flow.scheduling.slurm.
SlurmScheduler
(user=None)[source]¶ Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for SLURM schedulers.
This class can submit cluster jobs to a SLURM scheduler and query their current status.
Parameters: - user (str) – Limit the status information to cluster jobs submitted by user.
- **kwargs – Forwarded to the parent constructor.
-
submit
(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]¶ Submit a job script for execution to the scheduler.
Parameters: - script (str) – The job script submitted for execution.
- after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
- hold (bool) – Whether to hold the job upon submission. (Default value = False)
- pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
- flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
- **kwargs – Additional keyword arguments (ignored).
Returns: True if the submission command succeeds (or in pretend mode).
Return type: Raises: SubmitError
– If the submission command fails.
Error Classes¶
Definitions of Exception classes used in this package.
-
exception
flow.errors.
ConfigKeyError
[source]¶ Bases:
KeyError
Indicates that a config key was not found.
-
exception
flow.errors.
DirectivesError
[source]¶ Bases:
ValueError
Indicates that a directive was incorrectly set.
-
exception
flow.errors.
NoSchedulerError
[source]¶ Bases:
AttributeError
Indicates that there is no scheduler type defined for an environment class.
-
exception
flow.errors.
SubmitError
[source]¶ Bases:
RuntimeError
Indicates an error during cluster job submission.
-
class
flow.errors.
TemplateError
(environment)[source]¶ Bases:
jinja2.ext.Extension
Indicates an error in a jinja2 template.
-
exception
flow.errors.
UserConditionError
[source]¶ Bases:
RuntimeError
Indicates an error during evaluation of a condition.
-
exception
flow.errors.
UserOperationError
[source]¶ Bases:
RuntimeError
Indicates an error during execution of a
BaseFlowOperation
.