Advanced How-To
This is a collection of recipes on how to solve typical problems using signac.
Migrating (changing) the project schema
Adding/renaming/deleting keys
Oftentimes, one discovers at a later stage that important parameters are missing from the project schema. For example, in the tutorial we are modeling a gas using the ideal gas law, but we might discover later that important effects are not captured using this overly simplistic model and decide to replace it with the van der Waals equation:
Since the ideal gas law can be considered a special case of the equation above with \(a=b=0\), we could migrate all jobs with:
>>> for job in project:
... job.sp.setdefault("a", 0)
... job.sp.setdefault("b", 0)
...
The setdefault()
function sets the value for \(a\) and \(b\) to 0 in case that they are not already present.
To delete a key use
del job.sp['key_to_be_removed']
.To rename a key, use
job.sp.new_name = job.sp.pop('old_name')
.
Note
The job.sp
and job.doc
attributes provide all basic functions of a regular Python dict.
Applying document-wide changes
The safest approach to apply multiple document-wide changes is to replace the document in one operation. Here is an example on how we could recursively replace all dot (.)-characters with the underscore-character in all keys [1]:
import signac
from collections.abc import Mapping
def migrate(doc):
if isinstance(doc, Mapping):
return {k.replace(".", "_"): migrate(v) for k, v in doc.items()}
else:
return doc
for job in signac.get_project():
job.sp = migrate(job.sp)
job.doc = migrate(job.doc)
This approach makes it also easy to compare the pre- and post-migration states before actually applying them.
Initializing state points with replica indices
We often require multiple jobs with the same state point to collect enough information to make statistical inferences about the data. Instead of creating multiple projects to handle this, we can add a replica_index to the state point. For example, we can use the following code to generate 3 copies of each state point in a workspace:
# init.py
import signac
project = signac.init_project()
num_reps = 3
jobs = project.find_jobs({"replica_index.$exists": False})
for job in jobs:
job.sp["replica_index"] = 0
for i in range(num_reps):
for p in range(1, 11):
sp = {"p": p, "kT": 1.0, "N": 1000, "replica_index": i}
project.open_job(sp).init()
Defining a grid of state point values
Some signac project schemas are structured like a “grid” where the goal is an exhaustive search or a Cartesian product of multiple sets of parameters. While this can be done with nested for
loops, that approach can be cumbersome for state points with many keys. Here we offer a helper function that can assist in this kind of initialization, inspired by this StackOverflow answer:
# init.py
import itertools
import signac
project = signac.init_project()
def grid(gridspec):
"""Yields the Cartesian product of a `dict` of iterables.
The input ``gridspec`` is a dictionary whose keys correspond to
parameter names. Each key is associated with an iterable of the
values that parameter could take on. The result is a sequence of
dictionaries where each dictionary has one of the unique combinations
of the parameter values.
"""
for values in itertools.product(*gridspec.values()):
yield dict(zip(gridspec.keys(), values))
statepoint_grid = {"p": range(1, 11), "kT": [1.0, 5.0, 10.0], "N": [1000, 4000]}
for sp in grid(statepoint_grid):
print("Initializing job", sp)
project.open_job(sp).init()
Creating parameter-dependent operations
Operations defined as a function as part of a signac-flow workflow can only have one required argument: the job. That is to ensure reproducibility of these operations. An operation should be a true function of the job’s data without any hidden parameters.
Here we show how to define operations that are a function of one or more additional parameters without violating the above mentioned principle. Assuming that we have an operation called foo, which depends on parameter bar, here is how we could implement multiple operations that depend on that additional parameter without code duplication:
class Project(FlowProject):
pass
def setup_foo_workflow(bar):
# Make sure to make the operation-name a function of the parameter(s)!
@Project.post(lambda job: bar in job.doc.get("foo", []))
@Project.operation(f"foo-{bar}")
def foo(job):
job.doc.setdefault("foo", []).append(bar)
for bar in (4, 8, 15, 16, 23, 42):
setup_foo_workflow(bar=bar)
Using signac-flow with MATLAB or other software without Python interface
The easiest way to integrate software that has no native Python interface is to implement signac-flow operations in combination with the FlowProject.operation
cmd
keyword argument.
Assuming that we have a MATLAB script called prog.m
within the project root directory:
% prog.m
function []=prog(arg1, arg2)
display(arg1);
display(arg2);
exitcode = 0;
Then, we could implement a simple operation that passes it some metadata parameters like this:
@FlowProject.operation(cmd=True)
def compute_volume(job):
return "matlab -r 'prog {job.sp.foo} {job.sp.bar}' > {job.ws}/output.txt"
Executing this operation will store the output of the MATLAB script within the job’s workspace within a file called output.txt
.
Running MPI-parallelized operations
There are basically two strategies to implement FlowProject
operations that are MPI-parallelized, one for external programs and one for Python scripts.
MPI-operations with mpi4py or similar
Assuming that your operation is using mpi4py or similar, you do not have to change your code:
@FlowProject.operation
def hello_mpi(job):
from mpi4py import MPI
print("Hello from rank", MPI.COMM_WORLD.Get_rank())
You could run this operation directly with: mpiexec -n 2 python project.py run -o hello_mpi
.
Note
This strategy might fail in cases where you cannot ensure that the MPI communicator is initialized within the operation function.
Danger
Read and write operations to the job-/ and project-document are not protected against race-conditions and should only be executed on one rank at a time. This can be ensured for example like this:
from mpi4py import MPI
comm = MPI.COMM_WORLD
if comm.Get_rank() == 0:
job.doc.foo = "abc"
comm.barrier()
MPI-operations using the command line
Alternatively, you can implement an MPI-parallelized operation with the cmd
keyword argument of the FlowProject.operation
decorator.
This strategy lets you define the number of ranks directly within the code and is also the only possible strategy when integrating external programs without a Python interface.
Assuming that we have an MPI-parallelized program named my_program
, which expects an input file as its first argument and which we want to run on two ranks, we could implement the operation like this:
@FlowProject.operation(cmd=True, directives={"np": 2})
def hello_mpi(job):
return "mpiexec -n 2 mpi_program {job.ws}/input_file.txt"
The cmd
keyword argument instructs signac-flow to interpret the operation as a command rather than a Python function.
The directives
keyword argument provides additional instructions on how to execute this operation.
However, some script templates, including those designed for HPC cluster submissions, will use the value provided by the np
key to compute the required compute ranks for a specific submission.
Tip
You do not have to hard-code the number of ranks, it may be a function of the job, e.g.: FlowProject.operation(directives={"np": lambda job: job.sp.system_size // 1000})
.
MPI-operations with custom script templates
Finally, instead of modifying the operation implementation, you could use a custom script template, such as this one:
{% extends base_script %}
{% block body %}
{% for operation in operations %}
mpiexec -n {{ operation.directives.np }} operation.cmd
{% endfor %}
{% endblock %}
Storing the above template in a file called templates/script.sh
within your project root directory will prepend every operation command with mpiexec
and so on.
Forcing the execution of a specific operation for debugging
Sometimes it is necessary to repeatedly run a specific operation although it is not technically eligible for execution.
The easiest way to do so is to temporarily add the @FlowProject.post.never
post-condition to that specific operation definition.
Like the name implies, the post.never
condition is never true, so as long as the pre-conditions are met, that operation is always eligible for execution.
For example:
# [...]
@Project.pre.after(bar)
@Project.post.isfile("foo.txt")
@Project.post.never # TODO: Remove after debugging
@Project.operation
def foo(job):
pass
# ...
Then you could execute the operation for a hypothetical job with id abc123, for example with $ python project.py run -o foo -j abc123
, irrespective of whether the foo.txt
file exists or not.
Running in containerized environments
Using signac-flow in combination with container systems such as docker or singularity is easily achieved by modifying the executable
directive.
For example, assuming that we wanted to use a singularity container named software.simg
, which is placed within the project root directory, we use the following directive to specify that a given operation is to be executed within then container:
@Project.operation(directives={"executable": "singularity exec software.simg python"})
def containerized_operation(job):
pass
If you are using the run
command for execution, simply execute the whole script in the container:
$ singularity exec software.simg python project.py run
Attention
Many cluster environments will not allow you to submit jobs to the scheduler using the container image.
This means that the actual submission, (e.g. python project.py submit
or similar) will need to be executed with a local Python executable.
To avoid issues with dependencies that are only available in the container image, move imports into the operation function. Condition functions will be executed during the submission process to determine what to submit, so dependencies for those must be installed into the local environment as well.
Tip
You can define a decorator that can be reused like this:
def on_container(func):
return flow.directives(executable="singularity exec software.simg python")(func)
@on_container
@Project.operation
def containerized_operation(job):
pass
Using multiple execution environments for operations
Suppose that for a given project you wanted to run jobs on multiple
supercomputers, your laptop, and your desktop. On each of these different
machines, different operation directives may be needed. The FlowGroup
class provides a mechanism to easily specify the different requirements of each
different environment.
# project.py
from flow import FlowProject, directives
class Project(FlowProject):
pass
supercomputer = Project.make_group(name="supercomputer")
laptop = Project.make_group(name="laptop")
desktop = Project.make_group(name="desktop")
@supercomputer(
directives={
"ngpu": 4,
"executable": "singularity exec --nv /path/to/container python",
}
)
@laptop(directives={"ngpu": 0})
@desktop(directives={"ngpu": 1})
@Project.operation
def op1(job):
pass
@supercomputer(
directives=dict(nranks=40, executable="singularity exec /path/to/container python")
)
@laptop(directives={"nranks": 4})
@desktop(directives={"nranks": 8})
@Project.operation
def op2(job):
pass
if __name__ == "__main__":
Project().main()
Tip
Sometimes, a machine should only run certain operations. To specify that an operation should only run on certain machines, only decorate the operation with the groups for the ‘right’ machine(s).
Tip
To test operations with a small interactive job, a ‘test’ group can be used to ensure that the operations do not try to run on multiple cores or GPUs.
Passing command line options to operations run in a container or other environment
When executing an operation in a container (e.g. Singularity or Docker) or a different environment,
the operation will not receive command line flags from the submitting process. FlowGroups
can be
used to pass options to an exec
command. This example shows how to use the run_options
argument to tell an operation executed in a container to run in debug mode.
# project.py
from flow import FlowProject
class Project(flow.FlowProject):
pass
# Anything in run_options will be passed to the forked exec command when the operation is run.
# Here we just pass the debug flag.
debug = Project.make_group("debug", run_options="--debug")
@debug
@Project.post.isfile("a.txt")
@Project.operation(directives={"executable": "/path/to/container exec python3"})
def op1(job):
with open(job.fn("a.txt"), "w") as fh:
fh.write("hello world")
if __name__ == "__main__":
Project().main()
To run the operation with debugging, run the group called “debug” with python3 project.py run -o
debug
.