Source code for flow.hooks.track_operations

# Copyright (c) 2023 The Regents of the University of Michigan
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Built in execution hook for basic tracking."""
import json

from .util import collect_metadata

try:
    from .git_util import collect_git_metadata
except ImportError:
    GIT = False
else:
    GIT = True


_DEFAULT_FILENAME = "signac_execution_history.log"


[docs] class TrackOperations: """:class:`~.TrackOperations` tracks information about the execution of operations to a logfile. This hook can provides information on the start, successful completion, and/or error of one or more operations in a :class:`~.FlowProject` instance. The logs are stored in the file given by ``log_filename`` within the job's path. The file will be appended to if it already exists. The hooks stores metadata regarding the execution of the operation and the state of the project at the time of execution, error, and/or completion. The data will also include information about the git status if the project is detected as a git repo and ``GitPython`` is installed in the environment. Each call to the hook adds a single JSON line to the log file. These can be read using the `json` builtin package or :meth:`~.TrackOperations.read_log`. The current schema has the following structure: - ``time``: The time of querying the metadata. - ``stage``: Whether the hook was executed either "prior" or "after" the associated operation's execution. - ``error``: The error message on executing the operation if any. - ``project`` - ``path``: Filepath to the project - ``schema_version``: The project's schema version - ``operation``: The operation name - ``job_id``: The job id - ``git``: - ``commit_id``: The current commit of the project's git repo. - ``dirty``: Whether the project's repo has uncommitted changes or not. - ``_schema_version``: The metadata storage's schema version. Warning ------- This class will raise an exception when ``strict_git`` is set to ``True`` and either GitPython is not available or the repository contains uncommitted changes (i.e. is "dirty"). Examples -------- The following example will install :class:`~.TrackOperations` at the operation level. .. code-block:: python from flow import FlowProject from flow.hooks import TrackOperations class Project(FlowProject): pass track = TrackOperation() @track.install_operation_hooks(Project) @Project.operation def foo(job): pass The code block below provides an example of how install :class:`~.TrackOperations` to a instance of :class:`~.FlowProject` .. code-block:: python from flow import FlowProject from flow.hooks import TrackOperations class Project(FlowProject): pass if __name__ == "__main__": project = Project() project = TrackOperations().install_project_hooks(project) project.main() Parameters ---------- log_filename : str, optional The name of the log file in the job workspace. Defaults to "signac_execution_history.log". strict_git : bool, optional Whether to fail if ``GitPython`` cannot be imported or if there are uncommitted changes to the project's git repo. Defaults to ``True``. """ def __init__(self, log_filename=_DEFAULT_FILENAME, strict_git=True): self.log_filename = log_filename if strict_git and not GIT: raise RuntimeError( "Unable to collect git metadata from the repository, " "because the GitPython package is not installed.\n\n" "You can use '{}(strict_git=False)' to ignore this " "error.".format(type(self).__name__) ) self.strict_git = strict_git def _write_metadata(self, job, metadata): with open(job.fn(self.log_filename), "a") as logfile: logfile.write(json.dumps(metadata) + "\n") def _get_metadata(self, operation, job, stage, error=None): """Define log_operation to collect metadata of job workspace and write to logfiles.""" # Add execution-related information to metadata. metadata = {"stage": stage, "error": None if error is None else str(error)} metadata.update(collect_metadata(operation, job)) if GIT: git_metadata = collect_git_metadata(job) if self.strict_git and git_metadata["dirty"]: raise RuntimeError( "Unable to reliably log operation, because the git repository in " "the project root directory is dirty.\n\nMake sure to commit all " "changes or ignore this warning by setting '{}(strict_git=False)'.".format( type(self).__name__ ) ) metadata["project"]["git"] = git_metadata return metadata
[docs] def on_start(self, operation, job): """Track the start of execution of an operation on a job.""" self._write_metadata(job, self._get_metadata(operation, job, stage="prior"))
[docs] def on_success(self, operation, job): """Track the successful completion of an operation on a job.""" self._write_metadata(job, self._get_metadata(operation, job, stage="after"))
[docs] def on_exception(self, operation, error, job): """Log errors raised during the execution of an operation on a job.""" self._write_metadata( job, self._get_metadata(operation, job, stage="after", error=error) )
[docs] def install_operation_hooks(self, op, project_cls=None): """Decorate operation to track execution. Parameters ---------- op : function or type An operation function to log or a subclass of :class:`~.FlowProject` if ``project_cls`` is ``None``. project_cls : type A subclass of :class:`~.FlowProject`. """ if project_cls is None: return lambda func: self.install_operation_hooks(func, op) project_cls.operation_hooks.on_start(self.on_start)(op) project_cls.operation_hooks.on_success(self.on_success)(op) project_cls.operation_hooks.on_exception(self.on_exception)(op) return op
[docs] def install_project_hooks(self, project): """Install hooks to track all operations in a `flow.FlowProject`. Parameters ---------- project : flow.FlowProject The project to install hooks on. """ project.project_hooks.on_start.append(self.on_start) project.project_hooks.on_success.append(self.on_success) project.project_hooks.on_exception.append(self.on_exception) return project
[docs] @classmethod def read_log(cls, job, log_filename=_DEFAULT_FILENAME): """Return the execution log data as a list of dictionaries. Parameters ---------- job : signac.job.Job The job to read the execution history of. log_filename : str, optional The name of the log file in the job workspace. Defaults to "signac_execution_history.log". Returns ------- log : list[dict[str, any]] Returns the job's current execution history for logged operations. """ with open(job.fn(log_filename)) as fh: return [json.loads(line) for line in fh.readlines() if line != ""]