API Reference¶
This is the API for the signac-flow application.
The FlowProject¶
Attributes
Default aliases used within the status output. |
|
Determine which operations have been completed for job. |
|
|
Return status information about a job. |
|
Designate a function as a label function for this class. |
|
Yield all labels for the given |
|
Call this function to use the main command line interface. |
|
Make a |
|
Decorate a function to make it an operation with additional execution directives. |
alias of |
|
|
Add a hook function triggered after the operation exits with an error. |
|
Add a hook function triggered after the operation exits. |
|
Add a hook function triggered before an operation starts. |
|
Add a hook function triggered after the operation exits without error. |
Get the dictionary of operations that have been added to the workflow. |
|
|
Define and evaluate postconditions for operations. |
|
Copy postconditions from other operation(s). |
Evaluate if a document key is False for the job(s). |
|
|
Determine if the specified file exists for the job(s). |
|
Return False. |
|
Return |
Evaluate if a document key is True for the job(s). |
|
|
Define and evaluate preconditions for operations. |
|
Precondition to run an operation after other operations. |
|
Copy preconditions from other operation(s). |
Evaluate if a document key is False for the job(s). |
|
|
Determine if the specified file exists for the job(s). |
|
Return False. |
|
Return |
|
Evaluate if a document key is True for the job(s). |
|
Print the status of the project. |
|
|
|
Execute all eligible operations for the given selection. |
|
Fetch jobs from the scheduler. |
|
Submit function for the project's main submit interface. |
- class flow.FlowProject(config=None, environment=None, entrypoint=None)[source]¶
Bases:
signac.contrib.project.Project
A signac project class specialized for workflow management.
This class is used to define, execute, and submit workflows based on operations and conditions.
Users typically interact with this class through its command line interface.
This is a typical example of how to use this class:
@FlowProject.operation def hello(job): print('hello', job) FlowProject().main()
- Parameters
config (
signac.contrib.project._ProjectConfig
) – A signac configuration, defaults to the configuration loaded from the current directory.environment (
flow.environment.ComputeEnvironment
) – An environment to use for scheduler submission. IfNone
, the environment is automatically identified. The default isNone
.entrypoint (dict) – A dictionary with two possible keys:
'executable'
and'path'
. The path represents the location of the script file (the script file must callFlowProject.main()
). The executable represents the location of the Python interpreter used for the execution ofBaseFlowOperation
that are Python functions.
- ALIASES = {'active': 'A', 'error': 'E', 'group_active': 'GA', 'group_error': 'GE', 'group_held': 'GH', 'group_inactive': 'GI', 'group_queued': 'GQ', 'group_registered': 'GR', 'group_submitted': 'GS', 'held': 'H', 'inactive': 'I', 'queued': 'Q', 'registered': 'R', 'submitted': 'S', 'unknown': 'U'}¶
Default aliases used within the status output.
- PRINT_STATUS_ALL_VARYING_PARAMETERS = True¶
This constant can be used to signal that the print_status() method is supposed to automatically show all varying parameters.
- completed_operations(job)[source]¶
Determine which operations have been completed for job.
- Parameters
job (
Job
) – The signac job handle.- Yields
str – The names of the operations that are complete.
- detect_operation_graph()[source]¶
Determine the directed acyclic graph given by operation conditions.
In general, executing a given operation registered with a FlowProject just involves checking the operation’s preconditions and postconditions to determine eligibility. More generally, however, the preconditions and postconditions define a directed acyclic graph that governs the execution of all operations. Visualizing this graph can be useful for finding logic errors in the specified conditions, and having this graph computed also enables additional execution modes. For example, using this graph it is possible to determine exactly what operations need to be executed in order to make the operation eligible so that the task of executing all necessary operations can be automated.
The graph is determined by iterating over all pairs of operations and checking for equality of preconditions and postconditions. The algorithm builds an adjacency matrix based on whether the preconditions for one operation match the postconditions for another. The comparison of operations is conservative; by default, conditions must be composed of identical code to be identified as equal (technically, they must have equivalent bytecode, i.e.
cond1.__code__.co_code == cond2.__code__.co_code
). Users can specify that conditions should be treated as equal by providing tags to the operations.Given a
FlowProject
subclass defined in a moduleproject.py
, the output graph could be visualized using Matplotlib and NetworkX with the following code:import numpy as np import networkx as nx from matplotlib import pyplot as plt from project import Project project = Project() ops = project.operations.keys() adj = np.asarray(project.detect_operation_graph()) plt.figure() g = nx.DiGraph(adj) pos = nx.spring_layout(g) nx.draw(g, pos) nx.draw_networkx_labels( g, pos, labels={key: name for (key, name) in zip(range(len(ops)), [o for o in ops])}) plt.show()
- Returns
The adjacency matrix of operation dependencies. A zero indicates no dependency, and a 1 indicates dependency. This can be converted to a graph using NetworkX.
- Return type
list of lists of int
- Raises
RuntimeError – If a condition does not have a tag. This can occur when using
functools.partial
, and a manually specified condition tag has not been set.
- get_job_status(job, ignore_errors=False, cached_status=None)[source]¶
Return status information about a job.
- Parameters
job (
Job
) – The signac job.ignore_errors (bool) – Whether to ignore exceptions raised during status check. (Default value = False)
cached_status (dict) – Dictionary of cached status information. The keys are uniquely generated ids for each group and job. The values are instances of
JobStatus
. (Default value = None)
- Returns
A dictionary containing status for the requested job.
- Return type
- property groups¶
Get the dictionary of groups that have been added to the workflow.
- classmethod label(label_name_or_func=None)[source]¶
Designate a function as a label function for this class.
For example, we can define a label function like this:
@FlowProject.label def foo_label(job): if job.document.get('foo', False): return 'foo-label-text'
The
foo-label-text
label will now show up in the status view for each job, where thefoo
key evaluates true.If the label functions returns any type other than
str
, the label name will be the name of the function if and only if the return value evaluates toTrue
, for example:@FlowProject.label def foo_label(job): return job.document.get('foo', False)
Finally, specify a label name by providing it as the first argument to the
label()
decorator.- Parameters
label_name_or_func (str or callable) – A label name or callable. (Default value = None)
- Returns
A decorator for the label function.
- Return type
callable
- labels(job)[source]¶
Yield all labels for the given
job
.See also:
label()
.- Parameters
job (
signac.contrib.job.Job
) – Job handle.- Yields
str – Label value.
- main(parser=None)[source]¶
Call this function to use the main command line interface.
In most cases one would want to call this function as part of the class definition:
# my_project.py from flow import FlowProject class MyProject(FlowProject): pass if __name__ == '__main__': MyProject().main()
The project can then be executed on the command line:
$ python my_project.py --help
- Parameters
parser (
argparse.ArgumentParser
) – The argument parser used to implement the command line interface. If None, a new parser is constructed. (Default value = None)
- classmethod make_group(name, options='', group_aggregator=None)[source]¶
Make a
FlowGroup
namedname
and return a decorator to make groups.A
FlowGroup
is used to group operations together for running and submitting_JobOperation
s.Examples
The code below creates a group and adds an operation to that group.
example_group = FlowProject.make_group('example') @example_group @FlowProject.operation def foo(job): return "hello world"
- Parameters
options (str) – A string to append to submissions. Can be any valid
FlowOperation.run()
option. (Default value = “”)group_aggregator (
aggregator
) – An instance ofaggregator
to associate with theFlowGroup
. If None, no aggregation takes place (Default value = None).
- Returns
The created group.
- Return type
- property operations¶
Get the dictionary of operations that have been added to the workflow.
- print_status(jobs=None, overview=True, overview_max_lines=None, detailed=False, parameters=None, param_max_width=None, expand=False, all_ops=False, only_incomplete=False, dump_json=False, unroll=True, compact=False, pretty=False, file=None, err=None, ignore_errors=False, template=None, profile=False, eligible_jobs_max_lines=None, output_format='terminal')[source]¶
Print the status of the project.
- Parameters
jobs (iterable of
Job
or aggregates) – IfNone
, print status for all jobs/aggregates. If notNone
, only print status for the given jobs or aggregates (Default value = None).overview (bool) – Display an overview of the project status. (Default value = True)
overview_max_lines (int) – Limit the number of overview lines. (Default value = None)
detailed (bool) – Print a detailed status of each job. (Default value = False)
parameters (list of str) – Print the value of the specified parameters. (Default value = None)
param_max_width (int) – Limit the number of characters of parameter columns. (Default value = None)
expand (bool) – Present labels and operations in two separate tables. (Default value = False)
all_ops (bool) – Include operations that are not eligible to run. (Default value = False)
only_incomplete (bool) – Only show jobs that have eligible operations. (Default value = False)
dump_json (bool) – Output the data as JSON instead of printing the formatted output. (Default value = False)
unroll (bool) – Separate columns for jobs and the corresponding operations. (Default value = True)
compact (bool) – Print a compact version of the output. (Default value = False)
pretty (bool) – Prettify the output. (Default value = False)
file (str) – Redirect all output to this file, defaults to sys.stdout.
err (str) – Redirect all error output to this file, defaults to sys.stderr.
ignore_errors (bool) – Print status even if querying the scheduler fails. (Default value = False)
template (str) – User provided Jinja2 template file. (Default value = None)
profile (bool) – Show profile result. (Default value = False)
eligible_jobs_max_lines (int) – Limit the number of operations and its eligible job count printed in the overview. (Default value = None)
output_format (str) – Status output format, supports: ‘terminal’ (default), ‘markdown’ or ‘html’.
- property project_hooks¶
hooks.Hooks
defined for all project operations.Project-wide hooks are added to an instance of the FlowProject, not the class. For example:
def finish_hook(operation_name, job): print(f"Finished operation {operation_name} on job {job.id}") if __name__ == "__main__": project = FlowProject() project.project_hooks.on_finish.append(finish_hook) project.main()
- run(jobs=None, names=None, pretend=False, np=None, timeout=None, num=None, num_passes=1, progress=False, order=None, ignore_conditions=IgnoreConditions.NONE)[source]¶
Execute all eligible operations for the given selection.
This function will run in an infinite loop until all eligible operations are executed, unless it reaches the maximum number of passes per operation or the maximum number of executions.
By default there is no limit on the total number of executions, but a specific operation will only be executed once per job. This is to avoid accidental infinite loops when no or faulty postconditions are provided.
- Parameters
jobs (iterable of
Job
or aggregates) – IfNone
, execute operations for all eligible jobs/aggregates. If notNone
, only execute operations for the given jobs or aggregates (Default value = None).names (iterable of
str
) – Only execute operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)pretend (bool) – Do not actually execute the operations, but show the commands that would have been executed. (Default value = False)
np (int) – Parallelize to the specified number of processors. Use -1 to parallelize over all available processors. The value None uses one processor (Default value = None).
timeout (float) – An optional timeout for each operation in seconds after which execution will be cancelled. Use None to indicate no timeout (Default value = None).
num (int) – The total number of operations that are executed will not exceed this argument if provided. (Default value = None)
num_passes (int or None) – The total number of executions of one specific job-operation pair will not exceed this argument. The default is 1, there is no limit if this argument is None.
progress (bool) – Show a progress bar during execution. (Default value = False)
order (str, callable, or None) –
Specify the order of operations. Possible values are:
’none’ or None (no specific order)
’by-job’ (operations are grouped by job)
’cyclic’ (order operations cyclic by job)
’random’ (shuffle the execution order randomly)
callable (a callable returning a comparison key for an operation used to sort operations)
The default value is
'none'
, which is equivalent to'by-job'
in the current implementation.Note
Users are advised to not rely on a specific execution order as a substitute for defining the workflow in terms of preconditions and postconditions. However, a specific execution order may be more performant in cases where operations need to access and potentially lock shared resources.
ignore_conditions (
IgnoreConditions
) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default isIgnoreConditions.NONE
.
- scheduler_jobs(scheduler)[source]¶
Fetch jobs from the scheduler.
This function will fetch all scheduler jobs from the scheduler and also expand bundled jobs automatically.
However, this function will not automatically filter scheduler jobs which are not associated with this project.
- Parameters
scheduler (
Scheduler
) – The scheduler instance.- Yields
ClusterJob
– All cluster jobs fetched from the scheduler.
- submit(bundle_size=1, jobs=None, names=None, num=None, parallel=False, force=False, ignore_conditions=IgnoreConditions.NONE, ignore_conditions_on_execution=IgnoreConditions.NONE, **kwargs)[source]¶
Submit function for the project’s main submit interface.
- Parameters
bundle_size (int) – Specify the number of operations to be bundled into one submission, defaults to 1.
jobs (iterable of
Job
or aggregates) – IfNone
, submit operations for all eligible jobs/aggregates. If notNone
, only submit operations for the given jobs or aggregates (Default value = None).names (iterable of
str
) – Only submit operations that match the provided set of names (interpreted as regular expressions), or all if the argument is None. (Default value = None)num (int) – Limit the total number of submitted operations, defaults to no limit.
parallel (bool) – Execute all bundled operations in parallel. (Default value = False)
force (bool) – Ignore all warnings or checks during submission, just submit. (Default value = False)
ignore_conditions (
IgnoreConditions
) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility. The default isIgnoreConditions.NONE
.ignore_conditions_on_execution (
IgnoreConditions
) – Specify if preconditions and/or postconditions are to be ignored when determining eligibility after submitting. The default isIgnoreConditions.NONE
.**kwargs – Additional keyword arguments forwarded to
submit()
.
- FlowProject.operation(func, name=None)¶
Add operation functions to the class workflow definition.
This object is designed to be used as a decorator, for example:
@FlowProject.operation def hello(job): print('Hello', job)
Directives can also be specified by using
FlowProject.operation.with_directives()
.@FlowProject.operation.with_directives({"nranks": 4}) def mpi_hello(job): print("hello")
- Parameters
func (callable) – The function to add to the workflow.
name (str) – The operation name. Uses the name of the function if None. (Default value = None)
- Returns
The operation function.
- Return type
callable
- operation.with_directives(directives, name=None)¶
Decorate a function to make it an operation with additional execution directives.
Directives can be used to provide information about required resources such as the number of processors required for execution of parallelized operations. For more information, see Submission Directives. To apply directives to an operation that is part of a group, use
FlowGroupEntry.with_directives()
.- Parameters
- Returns
A decorator which registers the function with the provided name and directives as an operation of the
FlowProject
subclass.- Return type
function
Supported Directives:
executable
Return the path to the executable to be used for an operation.
The executable directive expects a string pointing to a valid executable file in the current file system.
When called, by default this should point to a Python executable (interpreter); however, if the
FlowProject
path is an empty string, the executable can be a path to an executable Python script. Defaults tosys.executable
.fork
The fork directive can be set to True to enforce that a particular operation is always executed within a subprocess and not within the Python interpreter’s process even if there are no other reasons that would prevent that.
Note
Setting
fork=False
will not prevent forking if there are other reasons for forking, such as a timeout.memory
The memory to request for this operation.
The memory to validate should be either a float, int, or string. A valid memory argument is defined as:
Positive numeric value with suffix “g” or “G” indicating memory requested in gigabytes.
For example:
@Project.operation.with_directives({"memory": "4g"}) def op(job): pass
Positive numeric value with suffix “m” or “M” indicating memory requested in megabytes.
For example:
@Project.operation.with_directives({"memory": "512m"}) def op(job): pass
Positive numeric value with no suffix indicating memory requested in gigabytes.
For example:
@Project.operation.with_directives({"memory": "4"}) def op1(job): pass @Project.operation.with_directives({"memory": 4}) def op2(job): pass
ngpu
The number of GPUs to use for this operation.
Expects a nonnegative integer. Defaults to 0.
np
The number of tasks to launch for a given operation i.e., the number of CPU cores to be requested for a given operation.
Expects a natural number (i.e. an integer >= 1). This directive introspects into the “nranks” or “omp_num_threads” directives and uses their product if it is greater than the current set value. Defaults to 1.
nranks
The number of MPI ranks to use for this operation. Defaults to 0.
Expects a nonnegative integer.
omp_num_threads
The number of OpenMP threads to use for this operation. Defaults to 0.
Expects a nonnegative integer.
processor_fraction
Fraction of a resource to use on a single operation.
If set to 0.5 for a bundled job with 20 operations (all with ‘np’ set to 1), 10 CPUs will be used. Defaults to 1.
Note
This can be particularly useful on Stampede2’s launcher.
walltime
The number of hours to request for executing this job.
This directive expects a float representing the walltime in hours. Fractional values are supported. For example, a value of 0.5 will request 30 minutes of walltime. If no walltimes are requested, the submission will not specify a walltime in the output script. Some schedulers have a default value that will be used.
For example:
@Project.operation.with_directives({"walltime": 24}) def op(job): # This operation takes 1 day to run pass
- FlowProject.operation_hooks(hook_func, trigger)¶
Add hooks to an operation.
This object is designed to be used as a decorator. The example below shows an operation level decorator that prints the operation name and job id at the start of the operation execution.
def start_hook(operation_name, job): print(f"Starting operation {operation_name} on job {job.id}.") @FlowProject.operation_hook.on_start(start_hook) @FlowProject.operation def foo(job): pass
A hook is a function that is called at specific points during the execution of a job operation. In the example above, the
start_hook
hook function is executed before the operation foo runs. Hooks can also run after an operation finishes, when an operation exits with error, or when an operation exits without error.The available triggers are
on_start
,on_finish
,on_fail
, andon_success
which run when the operation starts, completes, fails, and succeeds respectively.- Parameters
hook_func (callable) – The function that will be executed at a specified point.
trigger (string) – The point when a hook operation is executed.
- classmethod operation_hooks.on_fail(hook_func)¶
Add a hook function triggered after the operation exits with an error.
- classmethod operation_hooks.on_finish(hook_func)¶
Add a hook function triggered after the operation exits.
The hook is triggered regardless of whether the operation exits with or without an error.
- classmethod operation_hooks.on_start(hook_func)¶
Add a hook function triggered before an operation starts.
- classmethod operation_hooks.on_success(hook_func)¶
Add a hook function triggered after the operation exits without error.
- FlowProject.post(tag=None)¶
Define and evaluate postconditions for operations.
A postcondition is a function accepting one or more jobs as positional arguments (
*jobs
) that must evaluate to True for this operation to be considered complete. For example:@Project.operation @Project.post(lambda job: job.doc.get('bye')) def bye(job): print('bye', job) job.doc.bye = True @Project.operation @aggregator() @Project.post(lambda *jobs: all("bye_all" in job.doc for job in jobs)) def bye_all(*jobs): print('bye', jobs) for job in jobs: job.doc.bye_all = True
The bye operation would be considered complete and therefore no longer eligible for execution once the ‘bye’ key in the job document evaluates to True. Similarly, the bye_all operation would be considered complete and therefore no longer eligible for execution only if the ‘bye_all’ key is present in all of the jobs passed.
An optional tag may be associated with the condition. These tags are used by
detect_operation_graph()
when comparing conditions for equality. The tag defaults to the bytecode of the function.
- classmethod post.copy_from(*other_funcs)¶
Copy postconditions from other operation(s).
True if and only if all postconditions of other operation function(s) are met.
- classmethod post.false(key)¶
Evaluate if a document key is False for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to False.
- classmethod post.isfile(filename)¶
Determine if the specified file exists for the job(s).
- classmethod post.never(func)¶
Return False.
- classmethod post.not_(condition)¶
Return
not condition(*jobs)
for the provided condition function.
- classmethod post.true(key)¶
Evaluate if a document key is True for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to True.
- FlowProject.pre(tag=None)¶
Define and evaluate preconditions for operations.
A precondition is a function accepting one or more jobs as positional arguments (
*jobs
) that must evaluate to True for this operation to be eligible for execution. For example:@Project.operation @Project.pre(lambda job: not job.doc.get('hello')) def hello(job): print('hello', job) job.doc.hello = True @Project.operation @aggregator() @Project.pre(lambda *jobs: all("hi_all" not in job.doc for job in jobs)) def hi_all(*jobs): print('hi', jobs) for job in jobs: job.doc.hi_all = True
The hello operation would only execute if the ‘hello’ key in the job document does not evaluate to True. Similarly, the hi_all operation would execute only if the ‘hi_all’ key is not present in all of the jobs passed.
An optional tag may be associated with the condition. These tags are used by
detect_operation_graph()
when comparing conditions for equality. The tag defaults to the bytecode of the function.
- classmethod pre.after(*other_funcs)¶
Precondition to run an operation after other operations.
True if and only if all postconditions of other operation function(s) are met.
- classmethod pre.copy_from(*other_funcs)¶
Copy preconditions from other operation(s).
True if and only if all preconditions of other operation function(s) are met.
- classmethod pre.false(key)¶
Evaluate if a document key is False for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to False.
- classmethod pre.isfile(filename)¶
Determine if the specified file exists for the job(s).
- classmethod pre.never(func)¶
Return False.
- classmethod pre.not_(condition)¶
Return
not condition(*jobs)
for the provided condition function.
- classmethod pre.true(key)¶
Evaluate if a document key is True for the job(s).
Returns True if the specified key is present in the job document(s) and evaluates to True.
Operations and Status¶
- class flow.project.BaseFlowOperation(pre=None, post=None)[source]¶
Bases:
object
A
BaseFlowOperation
represents a data space operation acting on any job.Every
BaseFlowOperation
is associated with a specific command.Preconditions (pre) and postconditions (post) can be used to trigger an operation only when certain conditions are met. Conditions are unary callables, which expect an instance of job as their first and only positional argument and return either True or False.
An operation is considered “eligible” for execution when all preconditions are met and when at least one of the postconditions is not met. Preconditions are always met when the list of preconditions is empty. Postconditions are never met when the list of postconditions is empty.
Note
This class should not be instantiated directly.
- Parameters
pre (sequence of callables) – List of preconditions.
post (sequence of callables) – List of postconditions.
- class flow.project.FlowOperation(op_func, pre=None, post=None)[source]¶
Bases:
flow.project.BaseFlowOperation
An operation that executes a Python function.
All operations without the
@cmd
directive use this class. The callableop_func
should be a function of one or more instances ofJob
.Note
This class should not be instantiated directly.
- Parameters
op_func (callable) – A callable function of
*jobs
.pre (sequence of callables) – List of preconditions.
post (sequence of callables) – List of postconditions.
- class flow.project.FlowCmdOperation(cmd, pre=None, post=None)[source]¶
Bases:
flow.project.BaseFlowOperation
An operation that executes a shell command.
When an operation has the
@cmd
directive specified, it is instantiated as aFlowCmdOperation
. The operation should be a function of one or more positional arguments that are instances ofJob
. The command (cmd) may either be a callable that expects one or more instances ofJob
as positional arguments and returns a string containing valid shell commands, or the string of commands itself. In either case, the resulting string may contain any attributes of the job (or jobs) placed in curly braces, which will then be substituted by Python string formatting.Note
This class should not be instantiated directly.
- Parameters
cmd (str or callable) – The command to execute the operation. Callable values will be provided one or more positional arguments (
*jobs
) that are instances ofJob
. String values will be formatted withcmd.format(jobs=jobs)
wherejobs
is a tuple ofJob
, orcmd.format(jobs=jobs, job=job)
if only one job is provided.pre (sequence of callables) – List of preconditions.
post (sequence of callables) – List of postconditions.
Labels¶
- class flow.label(name=None)[source]¶
Bases:
object
Decorate a
FlowProject
class function as a label function.For example:
class MyProject(FlowProject): @label() def foo(self, job): return True
- class flow.staticlabel(name=None)[source]¶
Bases:
flow.labels.label
A label decorator for staticmethods.
This decorator implies “staticmethod”!
- class flow.classlabel(name=None)[source]¶
Bases:
flow.labels.label
A label decorator for classmethods.
This decorator implies “classmethod”!
@flow.cmd¶
- flow.cmd(func)[source]¶
Indicate that
func
returns a shell command with this decorator.If this function is an operation function defined by
FlowProject
, it will be interpreted to return a shell command, instead of executing the function itself.For example:
@FlowProject.operation @flow.cmd def hello(job): return "echo {job.id}"
@flow.with_job¶
- flow.with_job(func)[source]¶
Use
arg
as a context manager forfunc(arg)
with this decorator.This decorator can only be used for operations that accept a single job as a parameter.
If this function is an operation function defined by
FlowProject
, it will be the same as usingwith job:
.For example:
@FlowProject.operation @flow.with_job def hello(job): print("hello {}".format(job))
Is equivalent to:
@FlowProject.operation def hello(job): with job: print("hello {}".format(job))
This also works with the @cmd decorator:
@FlowProject.operation @with_job @cmd def hello(job): return "echo 'hello {}'".format(job)
Is equivalent to:
@FlowProject.operation @cmd def hello_cmd(job): return 'trap "cd `pwd`" EXIT && cd {} && echo "hello {job}"'.format(job.ws)
@flow.directives¶
- class flow.directives(**kwargs)[source]¶
Bases:
object
Decorator for operation functions to provide additional execution directives.
Directives can for example be used to provide information about required resources such as the number of processes required for execution of parallelized operations. For more information, read about Submission Directives.
Deprecated since version 0.15: This decorator is deprecated and will be removed in 1.0. Use
FlowProject.operation.with_directives
instead.Supported Directives:
executable
Return the path to the executable to be used for an operation.
The executable directive expects a string pointing to a valid executable file in the current file system.
When called, by default this should point to a Python executable (interpreter); however, if the
FlowProject
path is an empty string, the executable can be a path to an executable Python script. Defaults tosys.executable
.fork
The fork directive can be set to True to enforce that a particular operation is always executed within a subprocess and not within the Python interpreter’s process even if there are no other reasons that would prevent that.
Note
Setting
fork=False
will not prevent forking if there are other reasons for forking, such as a timeout.memory
The memory to request for this operation.
The memory to validate should be either a float, int, or string. A valid memory argument is defined as:
Positive numeric value with suffix “g” or “G” indicating memory requested in gigabytes.
For example:
@Project.operation.with_directives({"memory": "4g"}) def op(job): pass
Positive numeric value with suffix “m” or “M” indicating memory requested in megabytes.
For example:
@Project.operation.with_directives({"memory": "512m"}) def op(job): pass
Positive numeric value with no suffix indicating memory requested in gigabytes.
For example:
@Project.operation.with_directives({"memory": "4"}) def op1(job): pass @Project.operation.with_directives({"memory": 4}) def op2(job): pass
ngpu
The number of GPUs to use for this operation.
Expects a nonnegative integer. Defaults to 0.
np
The number of tasks to launch for a given operation i.e., the number of CPU cores to be requested for a given operation.
Expects a natural number (i.e. an integer >= 1). This directive introspects into the “nranks” or “omp_num_threads” directives and uses their product if it is greater than the current set value. Defaults to 1.
nranks
The number of MPI ranks to use for this operation. Defaults to 0.
Expects a nonnegative integer.
omp_num_threads
The number of OpenMP threads to use for this operation. Defaults to 0.
Expects a nonnegative integer.
processor_fraction
Fraction of a resource to use on a single operation.
If set to 0.5 for a bundled job with 20 operations (all with ‘np’ set to 1), 10 CPUs will be used. Defaults to 1.
Note
This can be particularly useful on Stampede2’s launcher.
walltime
The number of hours to request for executing this job.
This directive expects a float representing the walltime in hours. Fractional values are supported. For example, a value of 0.5 will request 30 minutes of walltime. If no walltimes are requested, the submission will not specify a walltime in the output script. Some schedulers have a default value that will be used.
For example:
@Project.operation.with_directives({"walltime": 24}) def op(job): # This operation takes 1 day to run pass
flow.init()¶
- flow.init(alias=None, template=None, root=None)[source]¶
Initialize a templated
FlowProject
module.- Parameters
alias (str) – Python identifier used as a file name for the template output. Uses
"project"
if None. (Default value = None)template (str) –
Name of the template to use. Built-in templates are:
"minimal"
"example"
"testing"
Uses
"minimal"
if None. (Default value = None)root (str) – Directory where the output file is placed. Uses the current working directory if None. (Default value = None)
- Returns
List of file names created.
- Return type
flow.get_environment()¶
- flow.get_environment(test=False, import_configured=True)[source]¶
Attempt to detect the present environment.
This function iterates through all defined
ComputeEnvironment
classes in reversed order of definition and returns the first environment where theis_present()
method returns True.- Parameters
- Returns
The detected environment class.
- Return type
The FlowGroup¶
- class flow.project.FlowGroup(name, operations=None, operation_directives=None, options='')[source]¶
Bases:
object
A
FlowGroup
represents a subset of a workflow for a project.A
FlowGroup
is associated with one or more instances ofBaseFlowOperation
.Examples
In the example below, the directives will be
{'nranks': 4}
for op1 and{'nranks': 2, 'executable': 'python3'}
for op2.group = FlowProject.make_group(name='example_group') @group.with_directives({"nranks": 4}) @FlowProject.operation.with_directives({"nranks": 2, "executable": "python3"}) def op1(job): pass @group @FlowProject.operation.with_directives({"nranks": 2, "executable": "python3"}) def op2(job): pass
- Parameters
name (str) – The name of the group to be used when calling from the command line.
operations (dict) – A dictionary of operations where the keys are operation names and each value is a
BaseFlowOperation
.operation_directives (dict) – A dictionary of additional parameters that provide instructions on how to execute a particular operation, e.g., specifically required resources. Operation names are keys and the dictionaries of directives are values. If an operation does not have directives specified, then the directives of the singleton group containing that operation are used. To prevent this, set the directives to an empty dictionary for that operation.
options (str) – A string of options to append to the output of the object’s call method. This allows options like
--num_passes
to be given to a group.
- add_operation(name, operation, directives=None)[source]¶
Add an operation to the
FlowGroup
.- Parameters
name (str) – The name of the operation.
operation (
BaseFlowOperation
) – The workflow operation to add to theFlowGroup
.directives (dict) – The operation specific directives. (Default value = None)
- class flow.project.FlowGroupEntry(name, project, options='', group_aggregator=None)[source]¶
Bases:
object
A FlowGroupEntry registers operations for inclusion into a
FlowGroup
.Application developers should not directly instantiate this class, but use
make_group()
instead.Operation functions can be marked for inclusion into a
FlowGroup
by decorating the functions with a correspondingFlowGroupEntry
. If the operation requires specific directives,with_directives()
accepts keyword arguments that are mapped to directives and returns a decorator that can be applied to the operation to mark it for inclusion in the group and indicate that it should be executed using the specified directives. This overrides the default directives specified byflow.directives()
.- Parameters
project (flow.FlowProject) – The project the group is associated with.
options (str) – The
FlowProject.run()
options to pass when submitting the group. These will be included in all submissions. Submissions use run commands to execute.group_aggregator (
aggregator
) – aggregator object associated with theFlowGroup
(Default value = None).
- __call__(func)[source]¶
Add the function into the group’s operations.
This call operator allows the class to be used as a decorator.
- Parameters
func (callable) – The function to decorate.
- Returns
The decorated function.
- Return type
callable
- with_directives(directives)[source]¶
Decorate an operation to provide additional execution directives for this group.
Directives can be used to provide information about required resources such as the number of processors required for execution of parallelized operations. For a list of supported directives, see
FlowProject.operation.with_directives()
. For more information, see Submission Directives.The directives specified in this decorator are only applied when executing the operation through the
FlowGroup
. To apply directives to an individual operation executed outside of the group, seeFlowProject.operation.with_directives()
.- Parameters
directives (dict) – Directives to use for resource requests and execution.
- Returns
A decorator which registers the operation with the group using the specified directives.
- Return type
function
Aggregation¶
- class flow.aggregator(aggregator_function=None, sort_by=None, sort_ascending=True, select=None)[source]¶
Bases:
object
Decorator for operation functions that operate on aggregates.
By default, if the
aggregator_function
isNone
, an aggregate of all jobs will be created.Examples
The code block below defines a
FlowOperation
that prints the total length of the provided aggregate of jobs.@aggregator() @FlowProject.operation def foo(*jobs): print(len(jobs))
- Parameters
aggregator_function (callable or None) – A callable that performs aggregation of jobs. It takes in a list of jobs and can return or yield subsets of jobs as an iterable. The default behavior is creating a single aggregate of all jobs.
sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the
key
argument tosorted()
. If None, no sorting is performed (Default value = None).sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).
select (callable or None) – Condition for filtering individual jobs. This is passed as the
function
argument tofilter()
. If None, no filtering is performed (Default value = None).
- __call__(func=None)[source]¶
Add this aggregator to a provided operation.
This call operator allows the class to be used as a decorator.
- Parameters
func (callable) – The function to decorate.
- classmethod groupby(key, default=None, sort_by=None, sort_ascending=True, select=None)[source]¶
Aggregate jobs according to matching state point values.
Examples
The code block below provides an example of how to aggregate jobs by a state point parameter
"key"
. If the state point does not contain the key"key"
, a default value of -1 is used.@aggregator.groupby(key="key", default=-1) @FlowProject.operation def foo(*jobs): print(len(jobs))
- Parameters
key (str, Iterable[str], or callable) – The method by which jobs are grouped. It may be a state point key or an iterable of state point keys whose values define the groupings. It may also be an arbitrary callable of
Job
when greater flexibility is needed.default (Any) – Default value used for grouping if the key is missing or invalid. If
key
is an iterable, the default value must be a sequence of equal length. Ifkey
is a callable, this argument is ignored. If None, the provided keys must exist for all jobs (Default value = None).sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the
key
argument tosorted()
. If None, no sorting is performed (Default value = None).sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).
select (callable or None) – Condition for filtering individual jobs. This is passed as the
function
argument tofilter()
. If None, no filtering is performed (Default value = None).
- Returns
aggregator – The
groupby()
aggregator.- Return type
- classmethod groupsof(num=1, sort_by=None, sort_ascending=True, select=None)[source]¶
Aggregate jobs into groupings of a given size.
By default, creates aggregates consisting of a single job.
If the number of jobs present in the project is not divisible by the number provided by the user, the last aggregate will be smaller and contain the remaining jobs. For instance, if 10 jobs are present in a project and they are aggregated in groups of 3, then the generated aggregates will have lengths 3, 3, 3, and 1.
Examples
The code block below shows how to aggregate jobs in groups of 2.
@aggregator.groupsof(num=2) @FlowProject.operation def foo(*jobs): print(len(jobs))
- Parameters
num (int) – The default size of aggregates. The final aggregate contains the remaining jobs and may have fewer than
num
jobs.sort_by (str, callable, or None) – Before aggregating, sort the jobs by a given statepoint parameter. If the argument is a string, jobs are sorted by that state point key. If the argument is callable, this will be passed as the
key
argument tosorted()
. If None, no sorting is performed (Default value = None).sort_ascending (bool) – True if the jobs are to be sorted in ascending order (Default value = True).
select (callable or None) – Condition for filtering individual jobs. This is passed as the
function
argument tofilter()
. If None, no filtering is performed (Default value = None).
- Returns
aggregator – The
groupsof()
aggregator.- Return type
- flow.get_aggregate_id(aggregate)[source]¶
Generate aggregate id for an aggregate of jobs.
The aggregate id is a unique hash identifying a tuple of jobs. The aggregate id is sensitive to the order of the jobs in the aggregate. The id of an aggregate containing one job is that job’s id (the hash of its state point).
Compute Environments¶
Detection of compute environments.
This module provides the ComputeEnvironment
class, which can be
subclassed to automatically detect specific computational environments.
This enables the user to adjust their workflow based on the present environment, e.g. for the adjustment of scheduler submission scripts.
- flow.environment.setup(py_modules, **attrs)[source]¶
Set up user-defined environment modules.
Use this function in place of
setuptools.setup()
to not only install an environment’s module, but also register it with the global signac configuration. Once registered, the environment is automatically imported when theget_environment()
function is called.
- flow.environment.template_filter(func)[source]¶
Decorate a function as a
ComputeEnvironment
template filter.This decorator is applied to methods defined in a subclass of
ComputeEnvironment
that are used in that environment’s templates. The decorated function becomes a class method that is available as a jinja2 filter in templates rendered by aFlowProject
with thatComputeEnvironment
.- Parameters
func (callable) – Function to decorate.
- Returns
Decorated function.
- Return type
callable
- class flow.environment.ComputeEnvironment[source]¶
Bases:
object
Define computational environments.
The ComputeEnvironment class allows us to automatically determine specific environments in order to programmatically adjust workflows in different environments.
The default method for the detection of a specific environment is to provide a regular expression matching the environment’s hostname. For example, if the hostname is
my-server.com
, one could identify the environment by setting thehostname_pattern
to'my-server'
.- classmethod add_args(parser)[source]¶
Add arguments related to this compute environment to an argument parser.
- Parameters
parser (
argparse.ArgumentParser
) – The argument parser where arguments will be added.
- classmethod get_config_value(key, default=<flow.util.config._GetConfigValueNoneType object>)[source]¶
Request a value from the user’s configuration.
This method should be used whenever values need to be provided that are specific to a user’s environment, e.g. account names.
When a key is not configured and no default value is provided, a
SubmitError
will be raised and the user will be prompted to add the missing key to their configuration.Please note, that the key will be automatically expanded to be specific to this environment definition. For example, a key should be
'account'
, not'MyEnvironment.account'
.- Parameters
- Returns
The value or default value.
- Return type
- Raises
SubmitError – If the key is not in the user’s configuration and no default value is provided.
- classmethod get_prefix(operation, parallel=False, mpi_prefix=None, cmd_prefix=None)[source]¶
Template filter generating a command prefix from directives.
- Parameters
operation (
flow.project._JobOperation
) – The operation to be prefixed.parallel (bool) – If True, operations are assumed to be executed in parallel, which means that the number of total tasks is the sum of all tasks instead of the maximum number of tasks. Default is set to False.
mpi_prefix (str) – User defined mpi_prefix string. Default is set to None. This will be deprecated and removed in the future.
cmd_prefix (str) – User defined cmd_prefix string. Default is set to None. This will be deprecated and removed in the future.
- Returns
The prefix to be added to the operation’s command.
- Return type
- classmethod get_scheduler()[source]¶
Return an environment-specific scheduler driver.
The returned scheduler class provides a standardized interface to different scheduler implementations.
- classmethod is_present()[source]¶
Determine whether this specific compute environment is present.
The default method for environment detection is trying to match a hostname pattern or delegate the environment check to the associated scheduler type.
- classmethod submit(script, flags=None, *args, **kwargs)[source]¶
Submit a job submission script to the environment’s scheduler.
Scripts should be submitted to the environment, instead of directly to the scheduler to allow for environment specific post-processing.
- Parameters
- Returns
Status of job, if submitted.
- Return type
JobStatus.submitted or None
- class flow.environment.StandardEnvironment[source]¶
Bases:
flow.environment.ComputeEnvironment
Default environment which is always present.
- class flow.environment.TestEnvironment[source]¶
Bases:
flow.environment.ComputeEnvironment
Environment used for testing.
The test environment will print a mocked submission script and submission commands to screen. This enables testing of the job submission script generation in environments without a real scheduler.
- scheduler_type¶
- class flow.environment.SimpleSchedulerEnvironment[source]¶
Bases:
flow.environment.ComputeEnvironment
An environment for the simple-scheduler scheduler.
- scheduler_type¶
Schedulers¶
Defines the API for the scheduling system.
- class flow.scheduling.base.JobStatus(value)[source]¶
Bases:
enum.IntEnum
Classifies the job’s execution status.
Group statuses exist to enable status output for individual operations within a larger group.
- active = 7¶
The cluster job is actively running.
- error = 8¶
The cluster job is in an error or failed state.
- group_active = 14¶
The operation is in a group that is actively running.
- group_error = 15¶
The operation is in a group that is in an error or failed state.
- group_held = 12¶
The operation is in a group that is held.
- group_inactive = 10¶
The operation is in a group that is inactive.
This includes states like completed, cancelled, or timed out.
- group_queued = 13¶
The operation is in a group that is queued.
- group_registered = 9¶
The operation is in a group that is registered with the scheduler.
- group_submitted = 11¶
The operation is in a group that has been submitted.
Note that this state is never returned by a scheduler, but is an assumed state immediately after a group containing the operation is submitted.
- held = 5¶
The cluster job is held.
- inactive = 3¶
The cluster job is inactive.
This includes states like completed, cancelled, or timed out.
- placeholder = 127¶
A placeholder state that is used for status rendering when no operations are eligible.
- queued = 6¶
The cluster job is queued.
- registered = 2¶
The cluster job is registered with the scheduler, but no other status is known.
- submitted = 4¶
The cluster job has been submitted.
Note that this state is never returned by a scheduler, but is an assumed state immediately after a cluster job is submitted.
- unknown = 1¶
Unknown cluster job status.
- user = 128¶
All user-defined states must be >=128 in value.
- class flow.scheduling.base.ClusterJob(job_id, status=None)[source]¶
Bases:
object
Class representing a cluster job.
- class flow.scheduling.base.Scheduler[source]¶
Bases:
abc.ABC
Abstract base class for schedulers.
- abstract jobs()[source]¶
Yield all cluster jobs.
- Yields
ClusterJob
– Cluster job.
- class flow.scheduling.fake_scheduler.FakeScheduler[source]¶
Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract
Scheduler
class for a fake scheduler.This scheduler does not actually schedule (or execute) any jobs, but it can be used to test the submission workflow.
- classmethod is_present()[source]¶
Return False.
The
FakeScheduler
is never present unless manually specified.
- class flow.scheduling.lsf.LSFScheduler(user=None)[source]¶
Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for LSF schedulers.
This class can submit cluster jobs to a LSF scheduler and query their current status.
- Parameters
user (str) – Limit the status information to cluster jobs submitted by user.
**kwargs – Forwarded to the parent constructor.
- submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]¶
Submit a job script for execution to the scheduler.
- Parameters
script (str) – The job script submitted for execution.
after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
hold (bool) – Whether to hold the job upon submission. (Default value = False)
pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
**kwargs – Additional keyword arguments (ignored).
- Returns
True if the submission command succeeds (or in pretend mode).
- Return type
- Raises
SubmitError – If the submission command fails.
- class flow.scheduling.pbs.PBSScheduler(user=None)[source]¶
Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for PBS schedulers.
This class can submit cluster jobs to a PBS scheduler and query their current status.
- Parameters
user (str) – Limit the status information to cluster jobs submitted by user.
**kwargs – Forwarded to the parent constructor.
- submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]¶
Submit a job script for execution to the scheduler.
- Parameters
script (str) – The job script submitted for execution.
after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
hold (bool) – Whether to hold the job upon submission. (Default value = False)
pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
**kwargs – Additional keyword arguments (ignored).
- Returns
True if the submission command succeeds (or in pretend mode).
- Return type
- Raises
SubmitError – If the submission command fails.
- class flow.scheduling.simple_scheduler.SimpleScheduler[source]¶
Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for the bundled
simple-scheduler
.The package signac-flow includes a script in
bin/simple-scheduler
that is a simple model of a cluster job scheduler. Thesimple-scheduler
script is designed primarily for testing and demonstration.This class can submit cluster jobs to the built-in simple scheduler and query their current status.
- submit(script, *, pretend=False, **kwargs)[source]¶
Submit a job script for execution to the scheduler.
- Parameters
script (str) – The job script submitted for execution.
pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
**kwargs – Keyword arguments (ignored).
- Returns
True if the submission command succeeds (or in pretend mode).
- Return type
- Raises
SubmitError – If the submission command fails.
- class flow.scheduling.slurm.SlurmScheduler(user=None)[source]¶
Bases:
flow.scheduling.base.Scheduler
Implementation of the abstract Scheduler class for SLURM schedulers.
This class can submit cluster jobs to a SLURM scheduler and query their current status.
- Parameters
user (str) – Limit the status information to cluster jobs submitted by user.
**kwargs – Forwarded to the parent constructor.
- submit(script, *, after=None, hold=False, pretend=False, flags=None, **kwargs)[source]¶
Submit a job script for execution to the scheduler.
- Parameters
script (str) – The job script submitted for execution.
after (str) – Execute the submitted script after a job with this id has completed. (Default value = None)
hold (bool) – Whether to hold the job upon submission. (Default value = False)
pretend (bool) – If True, do not actually submit the script, but only simulate the submission. Can be used to test whether the submission would be successful. Please note: A successful “pretend” submission is not guaranteed to succeed. (Default value = False)
flags (list) – Additional arguments to pass through to the scheduler submission command. (Default value = None)
**kwargs – Additional keyword arguments (ignored).
- Returns
True if the submission command succeeds (or in pretend mode).
- Return type
- Raises
SubmitError – If the submission command fails.
Error Classes¶
Definitions of Exception classes used in this package.
- exception flow.errors.ConfigKeyError[source]¶
Bases:
KeyError
Indicates that a config key was not found.
- exception flow.errors.DirectivesError[source]¶
Bases:
ValueError
Indicates that a directive was incorrectly set.
- exception flow.errors.FlowProjectDefinitionError[source]¶
Bases:
ValueError
Indicates an invalid FlowProject definition.
- exception flow.errors.NoSchedulerError[source]¶
Bases:
AttributeError
Indicates that there is no scheduler type defined for an environment class.
- exception flow.errors.SubmitError[source]¶
Bases:
RuntimeError
Indicates an error during cluster job submission.
- class flow.errors.TemplateError(environment: jinja2.environment.Environment)[source]¶
Bases:
jinja2.ext.Extension
Indicates an error in a jinja2 template.
- exception flow.errors.UserConditionError[source]¶
Bases:
RuntimeError
Indicates an error during evaluation of a condition.
- exception flow.errors.UserOperationError[source]¶
Bases:
RuntimeError
Indicates an error during execution of a
BaseFlowOperation
.