Source code for flow.scheduling.base

# Copyright (c) 2018 The Regents of the University of Michigan
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
"""Definition of base classes for the scheduling system."""
import enum
import subprocess
import tempfile
import time
from abc import ABC, abstractmethod

from ..errors import SubmitError

[docs]class JobStatus(enum.IntEnum): """Classifies the job's execution status.""" unknown = 1 """Unknown cluster job status.""" registered = 2 """The cluster job is registered with the scheduler, but no other status is known.""" inactive = 3 """The cluster job is inactive. This includes states like completed, cancelled, or timed out. """ submitted = 4 """The cluster job has been submitted. Note that this state is never returned by a scheduler, but is an assumed state immediately after a cluster job is submitted. """ held = 5 """The cluster job is held.""" queued = 6 """The cluster job is queued.""" active = 7 """The cluster job is actively running.""" error = 8 """The cluster job is in an error or failed state.""" placeholder = 127 """A placeholder state that is used for status rendering when no operations are eligible.""" user = 128 """All user-defined states must be >=128 in value."""
[docs]class ClusterJob: """Class representing a cluster job.""" def __init__(self, job_id, status=None): self._job_id = job_id self._status = status def _id(self): return self._job_id def __str__(self): """Return job ID string.""" return str(self._id())
[docs] def name(self): """Return the name of the cluster job.""" return self._id()
[docs] def status(self): """Return the status of the cluster job.""" return self._status
[docs]class Scheduler(ABC): """Abstract base class for schedulers.""" # The UNIX time stamp of the last scheduler query. _last_query = None # The amount of time in seconds to wait between scheduler queries. # Repeated scheduler queries might risk a denial-of-service attack. _dos_timeout = 10 @classmethod def _prevent_dos(cls): """Prevent denial of service by enforcing a back-off period. This method should *always* be called before querying the scheduler. This method will raise an exception if it is called more than once within a time window defined by the value of ``_dos_timeout``. This is to prevent an (accidental) denial-of-service attack on the scheduling system. """ if cls._last_query is not None: if time.time() - cls._last_query < cls._dos_timeout: raise RuntimeError("Too many scheduler requests within a short time!") cls._last_query = time.time()
[docs] @abstractmethod def jobs(self): """Yield all cluster jobs. Yields ------ :class:`.ClusterJob` Cluster job. """ raise NotImplementedError()
[docs] @abstractmethod def submit(self, script, **kwargs): """Submit a job script to the scheduler for execution.""" raise NotImplementedError()
[docs] @classmethod @abstractmethod def is_present(cls): """Return True if the scheduler is detected.""" raise NotImplementedError()
def _call_submit(submit_cmd, script, pretend): """Call submit command with a temporary script file. Parameters ---------- submit_cmd : list[str] List of strings composing the submission command and any flags. script : str Script as a string. pretend : bool If True, the script will be printed to screen instead of submitted. Returns ------- bool True if the submission command succeeds (or in pretend mode). Raises ------ :class:`~flow.errors.SubmitError` If the submission command fails. """ submit_cmd_string = " ".join(submit_cmd) if pretend: print(f"# Submit command: {submit_cmd_string}") print(script) print() else: with tempfile.NamedTemporaryFile() as tmp_submit_script: tmp_submit_script.write(str(script).encode("utf-8")) tmp_submit_script.flush() submit_cmd.append( try: subprocess.check_output(submit_cmd, universal_newlines=True) except subprocess.CalledProcessError as error: raise SubmitError( f"Error when calling submission command {submit_cmd_string}: {error.output()}" ) return True