The FlowProject
This chapter describes how to setup a complete workflow via the implementation of a FlowProject
.
It includes two fundamental concepts for the implementation of workflows with the signac-flow package: operations and conditions.
Setup and Interface
To implement an automated workflow using signac-flow, we create a subclass of FlowProject
, here named MyProject
:
# project.py
from flow import FlowProject
class MyProject(FlowProject):
pass
if __name__ == "__main__":
MyProject().main()
Tip
The $ flow init
command will generate a minimal project.py
file like the one above.
There are multiple different templates available via the -t/--template
option.
Executing this script on the command line will give us access to this project’s specific command line interface:
~/my_project $ python project.py
Using environment configuration: StandardEnvironment
usage: project.py [-h] [-v] [--show-traceback] [--debug] {status,next,run,submit,exec} ...
Note
You can have multiple implementations of FlowProject
that all operate on the same signac data space!
This may be useful, for example, if you want to implement two very distinct workflows that operate on the same data space.
Simply put those in different modules, e.g., project_a.py
and project_b.py
.
Operations
It is highly recommended to divide individual modifications of your project’s data space into distinct functions.
In this context, an operation is defined as a function whose only positional arguments are instances of Job
.
We will demonstrate this concept with a simple example.
Let’s initialize a signac project with a few jobs, by executing the following init.py
script within a ~/my_project
directory:
# init.py
import signac
project = signac.init_project()
for i in range(10):
project.open_job({"a": i}).init()
A very simple operation, which creates a file called hello.txt
within the job directory, could be implemented like this:
# project.py
from flow import FlowProject
class MyProject(FlowProject):
pass
@MyProject.operation
def hello(job):
print("hello", job)
with job:
with open("hello.txt", "w") as file:
file.write("world!\n")
if __name__ == "__main__":
MyProject().main()
Tip
By default operations only act on a single job and can simply be defined with the signature def op(job)
.
When using aggregate operations, it is recommended to allow the operation to accept a variable number of jobs using a variadic parameter *jobs
, so that the operation is not restricted to a specific aggregate size.
Conditions
Here the operation()
decorator function specifies that the hello
operation function is part of our workflow.
If we run python project.py run
, signac-flow will execute hello
for all jobs in the project.
However, we only want to execute hello
if hello.txt
does not yet exist in the job directory.
To do this, we need to create a condition function named greeted
that tells us if hello.txt
already exists in the job directory:
def greeted(job):
return job.isfile("hello.txt")
To complete this component of the workflow, we use the post()
decorator function to specify that the hello
operation function should only be executed if the greeted
condition is not met.
The entirety of the code is as follows:
# project.py
from flow import FlowProject
class MyProject(FlowProject):
pass
def greeted(job):
return job.isfile("hello.txt")
# Pre/post condition decorators must appear on a line above the operation decorator
# so that the condition decorator is added after the operation decorator.
@MyProject.post(greeted)
@MyProject.operation
def hello(job):
with job:
with open("hello.txt", "w") as file:
file.write("world!\n")
if __name__ == "__main__":
MyProject().main()
Note
Decorators execute from the bottom to the top. For example, in the code block above
@MyProject.operation
is run before @MyProject.post(greeted)
. The code is roughly
equivalent to MyProject.post(greeted)(MyProject.operation(hello))
. See Python’s official
documentation
for more information.
We can define both pre()
and post()
conditions, which allow us to define arbitrary workflows as a directed acyclic graph.
An operation is only executed if all preconditions are met, and at at least one postcondition is not met.
These are added above a operation
decorator.
Using these decorators before declaring a function an operation is an error.
Tip
Cheap conditions should be placed before expensive conditions as they are evaluated lazily! That means for example, that given two pre-conditions, the following order of definition would be preferable:
@MyProject.pre(cheap_condition)
@MyProject.pre(expensive_condition)
@MyProject.operation
def hello(job):
pass
The same holds for post-conditions.
We can then execute this workflow with:
~/my_project $ python project.py run
Execute operation 'hello(15e548a2d943845b33030e68801bd125)'...
hello 15e548a2d943845b33030e68801bd125
Execute operation 'hello(288f97857257baee75d9d84bf0e9dfa8)'...
hello 288f97857257baee75d9d84bf0e9dfa8
Execute operation 'hello(2b985fa90138327bef586f9ad87fc310)'...
hello 2b985fa90138327bef586f9ad87fc310
# ...
If we implemented and integrated the operation and condition functions correctly, calling the run
command twice should not execute any operations the second time, since the greeted
condition is met for all jobs and the hello
operation should therefore not be executed.
Tip
The with_job
keyword argument can be used so the entire operation takes place in the job
context.
For example:
from flow import with_job
@MyProject.post(greeted)
@MyProject.operation(with_job=True)
def hello(job):
with open("hello.txt", "w") as file:
file.write("world!\n")
Is the same as:
@MyProject.post(greeted)
@MyProject.operation
def hello(job):
with job:
with open("hello.txt", "w") as file:
file.write("world!\n")
This saves a level of indentation and makes it clear the entire operation should take place in the job
context.
with_job
also works with the cmd
keyword argument:
@MyProject.operation(with_job=True, cmd=True)
def hello(job):
return "echo 'hello {}'".format(job)
The Project Status
The FlowProject
class allows us to generate a status view of our project.
The status view provides information about which conditions are met and what operations are pending execution.
A label function is a condition function which will be shown in the status view.
We can convert any condition function into a label function by adding the label()
decorator:
@MyProject.label
def greeted(job):
return job.isfile("hello.txt")
We will reset the workflow for only a few jobs to get a more interesting status view:
~/my_project $ signac find a.\$lt 5 | xargs -I{} rm workspace/{}/hello.txt
We then generate a detailed status view with:
~/my_project $ python project.py status --detailed --stack --pretty
Collect job status info: 100%|█████████████████████████████████████████████| 10/10
# Overview:
Total # of jobs: 10
label ratio
------- -------------------------------------------------
greeted |####################--------------------| 50.00%
# Detailed View:
job_id labels
-------------------------------- --------
0d32543f785d3459f27b8746f2053824 greeted
14fb5d016557165019abaac200785048
└● hello [U]
2af7905ebe91ada597a8d4bb91a1c0fc
└● hello [U]
2e6ba580a9975cf0c01cb3c3f373a412 greeted
42b7b4f2921788ea14dac5566e6f06d0
└● hello [U]
751c7156cca734e22d1c70e5d3c5a27f greeted
81ee11f5f9eb97a84b6fc934d4335d3d greeted
9bfd29df07674bc4aa960cf661b5acd2
└● hello [U]
9f8a8e5ba8c70c774d410a9107e2a32b
└● hello [U]
b1d43cd340a6b095b41ad645446b6800 greeted
Legend: ○:ineligible ●:eligible ▹:active ▸:running □:completed
This view provides information about what labels are met for each job and what operations are eligible for execution.
If we did things right, then only those jobs without the greeted
label should have the hello
operation pending.
We may hide the progress bar when generating the status view using the --hide-progress
flag.
~/my_project $ python project.py status --detailed --stack --pretty --hide-progress
# Overview:
Total # of jobs: 10
label ratio
------- -------------------------------------------------
greeted |####################--------------------| 50.00%
# Detailed View:
job_id labels
-------------------------------- --------
0d32543f785d3459f27b8746f2053824 greeted
14fb5d016557165019abaac200785048
└● hello [U]
2af7905ebe91ada597a8d4bb91a1c0fc
└● hello [U]
2e6ba580a9975cf0c01cb3c3f373a412 greeted
42b7b4f2921788ea14dac5566e6f06d0
└● hello [U]
751c7156cca734e22d1c70e5d3c5a27f greeted
81ee11f5f9eb97a84b6fc934d4335d3d greeted
9bfd29df07674bc4aa960cf661b5acd2
└● hello [U]
9f8a8e5ba8c70c774d410a9107e2a32b
└● hello [U]
b1d43cd340a6b095b41ad645446b6800 greeted
Legend: ○:ineligible ●:eligible ▹:active ▸:running □:completed
The same can be accomplished in Python (such as within a Jupyter cell) via,
project = MyProject.init_project()
project.print_status(detailed=True, parameters=["p"], hide_progress=True)
Hiding progress bars can declutter output, which can be useful when run in Jupyter notebooks.
As shown before, all eligible operations can then be executed with:
~/my_project $ python project.py run
Status is determined sequentially by default, because typically the overhead costs of using threads/processes are large.
However, this can be configured by setting a value for the flow.status_parallelization
configuration key.
Possible values are thread
, process
or none
with none
being the default value (turning off parallelization).
We can set the flow.status_parallelization
configuration value by directly editing the configuration file(s) or via the command line:
~/my_project $ signac config set flow.status_parallelization process
Check out the next section for a guide on how to submit operations to a cluster environment.