Source code for utopya_backend.model.base

"""This module implements the :py:class:`.BaseModel` class which can be
inherited from for implementing a utopya-controlled model.

Its main aim is to provide shared simulation infrastructure and does not make
further assumptions about the abstraction a model makes, like the step-wise
iteration done in :py:class:`~utopya_backend.model.step.StepwiseModel`."""

import abc
import random
import signal
import sys
import time
from typing import Optional, Union

import h5py as h5
import numpy as np

from ..logging import backend_logger as _backend_logger
from ..logging import get_level as _get_level
from ..signal import SIG_STOPCOND, SIGNAL_INFO, attach_signal_handlers
from ..tools import load_cfg_file as _load_cfg_file

# -----------------------------------------------------------------------------


[docs]class BaseModel(abc.ABC): """An abstract base model class that can be inherited from to implement a model. This class provides basic simulation infrastructure in a way that it couples to utopya as a frontend: - A shared RNG instance and a logger. - includes logic to evaluate ``num_steps`` and ``write_{start,every}`` - emits monitor information to the frontend, informing the utopya frontend about simulation progress For more specific purposes, there are specialized classes that are built around a shared modelling paradigm like step-wise iteration of the model, see :py:class:`~utopya_backend.model.step.StepwiseModel`. """ ATTACH_SIGNAL_HANDLERS: bool = True """If true, calls :py:func:`~utopya_backend.signal.attach_signal_handlers` to attach signal handlers for stop conditions and interrupts. .. hint:: You may want to disable this in a subclass in case you cannot handle a case where a signal is meant to stop your simulation gracefully. """ USE_SYS_EXIT: bool = True """If false, will not call sys.exit upon handled signal, but just return the error code.""" # .. Initialization and Teardown ..........................................
[docs] def __init__( self, *, cfg_file_path: str, _log: "logging.Logger" = None, ): """Initialize the model instance, constructing an RNG and HDF5 group to write the output data to. .. todo:: Allow initializing from a "parent model" such that hierarchical nesting of models becomes possible. Args: cfg_file_path (str): The path to the config file. _log (logging.Logger, optional): The logger instance from which to create a child logger for this model. If not given, will use the backend logger instance. """ if _log is None: _log = _backend_logger # Get the root configuration and get the instance name from there self._root_cfg = self._get_root_cfg(cfg_file_path, _log=_log) self._name = self._root_cfg["root_model_name"] _bases = (b.__name__ for b in type(self).__bases__) _log.info("Setting up model infrastructure ...") _log.info(" Class name: %s", type(self).__name__) _log.info(" Parent class: %s", ", ".join(_bases)) _log.info(" Instance name: %s", self.name) # Can now set up the loggers self._log = None self._setup_loggers(_log) # Optionally attach signal handlers for stop conditions and interrupts if self.ATTACH_SIGNAL_HANDLERS: self._signal_info = SIGNAL_INFO self._attach_signal_handlers() # Monitoring self.log.info("Extracting monitoring settings ...") self._last_emit = 0 self._monitor_info = dict() self._monitor_emit_interval = self.root_cfg["monitor_emit_interval"] # Keep track of number of iterations self._n_iterations = 0 # RNG # In order to ensure that a simulation is deterministic even when not # using the model-specific RNG instance, some singleton-like default # RNGs are (by default) seeded additionally. To avoid equal random # number sequences, they are salted. self._rng = self._setup_rng( seed=self.root_cfg["seed"], seed_numpy_rng=self.root_cfg.get("seed_numpy_rng", True), seed_system_rng=self.root_cfg.get("seed_system_rng", True), ) # Create the output file self._h5file = self._setup_output_file() self._h5group = self._setup_output_group() # Allow subclasses to parse root config parameters, potentiaally adding # more attributes self._parse_root_cfg(**self.root_cfg) # Actual model initialization self._cfg = self.root_cfg[self.name] self._invoke_setup() # First monitoring self.trigger_monitor(force=True) # Done. self.log.info( "Fully initialized %s named '%s'.\n", type(self).__name__, self.name, )
[docs] def __del__(self): """Takes care of tearing down the model""" self.log.debug("Tearing down model instance ...") try: self._h5file.close() except Exception as exc: self.log.error( "Closing HDF5 file failed, got %s: %s", type(exc).__name__, exc ) self.log.debug("Teardown complete.")
# .. Properties ........................................................... @property def name(self) -> str: """Returns the name of this model instance""" return self._name @property def log(self) -> "logging.Logger": """Returns the model's logger instance""" return self._log @property def rng(self) -> "numpy.random.Generator": """Returns the shared random number generator instance""" return self._rng @property def h5group(self) -> "h5py.Group": """The HDF5 group this model should write to""" return self._h5group @property def root_cfg(self) -> dict: """Returns the root configuration of the simulation run""" return self._root_cfg @property def cfg(self) -> dict: """Returns the model configuration, ``self.root_cfg[self.name]``""" return self._cfg @property def n_iterations(self) -> int: """Returns the number of iterations performed by this base class, i.e. the number of times :py:meth:`.iterate` was called. .. note:: This may not correspond to potentially existing other measures that specialized base classes implement. For instance, :py:meth:`utopya_backend.model.step.StepwiseModel.time` is *not* the same as the number of iterations. """ return self._n_iterations # .. Simulation control ...................................................
[docs] def run(self) -> int: """Performs a simulation run for this model, calling the :py:meth:`.iterate` method while :py:meth:`.should_iterate` is true. In addition, it takes care to invoke data writing and monitoring. Returns: int: exit code, non-zero upon handled signals """ self._pre_run() self._invoke_prolog() self.log.info("Commencing model run ...") while self.should_iterate(): self._pre_iterate() self._invoke_iterate() self._n_iterations += 1 # Allow to monitor simulation progress self.trigger_monitor() # Allow writing data if self.should_write(): self._invoke_write_data() # Inform about the iteration self.show_iteration_info() # Handle signals, which may lead to a sys.exit if (exit_code := self._check_signals()) is not None: self._invoke_epilog(finished_run=False) self._post_run(finished_run=False) if self.USE_SYS_EXIT: self.log.info("Now exiting (code: %d) ...", exit_code) sys.exit(exit_code) else: self.log.info("Now returning (code: %d) ...", exit_code) return exit_code self._post_iterate() self._invoke_epilog(finished_run=True) self._post_run(finished_run=True) self.log.info("Simulation run finished.\n") return 0
# .. Abstract methods .....................................................
[docs] @abc.abstractmethod def setup(self) -> None: """Called upon initialization of the model"""
[docs] @abc.abstractmethod def should_iterate(self) -> bool: """A method that determines whether :py:meth:`.iterate` should be called or not."""
[docs] @abc.abstractmethod def iterate(self) -> None: """Called repeatedly until the end of the simulation, which can be either due to"""
[docs] @abc.abstractmethod def should_write(self) -> bool: """A method that determines whether :py:meth:`.write_data` should be called after an iteration or not."""
[docs] @abc.abstractmethod def write_data(self) -> None: """Performs data writing if :py:meth:`.should_write` returned true."""
# .. Optionally subclassable methods ......................................
[docs] def _parse_root_cfg(self, **_) -> None: """Invoked from within :py:meth:`.__init__`, parses and handles configuration parameters. .. hint:: This method can be specialized in a subclass. """ pass
[docs] def _setup_finished(self) -> None: """Invoked from within :py:meth:`.__init__` after the call to the :py:meth:`.setup` method has finished. .. hint:: This method can be specialized in a subclass. """ pass
[docs] def monitor(self, monitor_info: dict) -> dict: """Called when a monitor emission is imminent; should be used to update the (model-specific) monitoring information passed here as arguments. .. hint:: This method can be specialized in a subclass. """ return monitor_info
[docs] def compute_progress(self) -> float: """Computes the progress of the simulation run. Should return a float between 0 and 1 and should *always* be monotonic. .. hint:: This method can be specialized in a subclass. """ return 0.0
[docs] def show_iteration_info(self) -> None: """A method that informs about the current iteration""" self.log.debug("Finished iteration %d.", self.n_iterations)
[docs] def prolog(self) -> None: """Invoked at the beginning of :py:meth:`.run`, before the first call to :py:meth:`.iterate`. .. hint:: This method can be specialized in a subclass. """ pass
[docs] def epilog(self, *, finished_run: bool) -> None: """Always invoked at the end of :py:meth:`.run`. This may happen either after :py:meth:`.should_iterate` returned False or any time before that, e.g. due to an interrupt signal or a stop condition. In the latter case, ``finished_run`` will be False. .. hint:: This method can be specialized in a subclass. """ pass
[docs] def _pre_run(self): """Invoked at beginning of :py:meth:`.run`""" pass
[docs] def _post_run(self, *, finished_run: bool) -> None: """Invoked at end of :py:meth:`.run`""" pass
[docs] def _pre_iterate(self): """Invoked at beginning of a *full* iteration""" pass
[docs] def _post_iterate(self): """Invoked at end of a *full* iteration (including monitoring, data writing etc.)""" pass
[docs] def _pre_monitor(self): """Invoked before monitor emission""" pass
[docs] def _post_monitor(self): """Invoked after monitor emission""" pass
# .. Signalling ...........................................................
[docs] def _attach_signal_handlers(self): """Invoked from :py:meth:`.__init__`, attaches a signal handler for the stop condition signal and other interrupts. .. note:: This should only be overwritten if you want or need to do your own signal handling. """ self.log.info("Attaching signal handlers ...") attach_signal_handlers()
[docs] def _check_signals(self) -> Union[None, int]: """Evaluates whether the iteration should stop due to an (expected) signal, e.g. from a stop condition or an interrupt. If it should stop, will return an integer, which can then be passed into :py:func:`sys.exit`. Exit codes will be ``128 + abs(signum)``, as is convention. This is also expected by :py:class:`~utopya.workermanager.WorkerManager` and is used to behave differently on a stop-condition-related signal than on an interrupt signal. Returns: Union[None, int]: An integer if the signal denoted that there should be a system exit; None otherwise. """ signal_info = self._signal_info if not signal_info["got_signal"]: return None # Received a signal signum = signal_info["signum"] if signum == SIG_STOPCOND: self.log.warning("A stop condition was fulfilled.") elif signum in (signal.SIGINT, signal.SIGTERM): self.log.warning("Was told to stop.") else: self.log.warning( "Got an unexpected signal: %d. Stopping ...", signum ) return 128 + abs(signum)
# .. Monitoring ...........................................................
[docs] def _monitor_should_emit(self, *, t: float = None) -> bool: """Evaluates whether the monitor should emit. This method will only return True once a monitor emit interval has passed since the last time the monitor was emitted. Args: t (None, optional): If given, uses this time, otherwise calls :py:func:`time.time`. Returns: bool: Whether to emit or not. """ t = t if t is not None else time.time() if t > self._last_emit + self._monitor_emit_interval: return True return False
[docs] def _emit_monitor(self): """Actually emits the monitoring information using :py:func:`print`.""" # TODO Consider using YAML for creating the monitor string def parse_val(v) -> str: if isinstance(v, float): return f"{v:6g}" return repr(v) progress = str(self.compute_progress()) monitor_info = ", ".join( f"{k}: {parse_val(v)}" for k, v in self._monitor_info.items() ) # Now emit ... # fmt: off print( "!!map { " + "progress: " + progress + ", " + "n_iter: " + str(self.n_iterations) + ", " + self.name + ": {" + monitor_info + "}" + "}", flush=True, )
# fmt: on
[docs] def trigger_monitor(self, *, force: bool = False): """Invokes the monitoring procedure: #. Checks whether :py:meth:`._monitor_should_emit`. #. If so, calls :py:meth:`.monitor` to update monitoring information. #. Then calls :py:meth:`._emit_monitor` to emit that information. If ``force`` is given, will always emit. .. hint:: This method should not be subclassed, but it can be invoked from within the subclass at any desired point. """ t = time.time() if force or self._monitor_should_emit(t=t): self._pre_monitor() self._monitor_info = self.monitor(self._monitor_info) self._emit_monitor() self._last_emit = t self._post_monitor()
# .. Other helpers ........................................................
[docs] def _get_root_cfg( self, cfg_file_path: str, *, _log: "logging.Logger" ) -> dict: """Retrieves the root configuration for this simulation run by loading it from the given file path. """ _log.info("Loading configuration file ...\n %s", cfg_file_path) return _load_cfg_file(cfg_file_path)
[docs] def _setup_loggers(self, _log: "logging.Logger"): """Sets up the model logger and configures the backend logger according to the ``log_levels`` entry set in the root configuration. .. todo:: Allow setting the logging *format* as well. Args: _log (logging.Logger): The logger to initialize the model logger from, typically the """ _log.info("Setting up loggers ...") # TODO Allow changing log formatters globally # Model logger and its level self._log = _log.getChild(self.name) log_level = self.root_cfg["log_levels"]["model"] self._log.setLevel(_get_level(log_level)) self.log.info(" Model logger initialized with '%s' level.", log_level) # May want to adjust the backend logger backend_log_level = self.root_cfg["log_levels"].get("backend") if backend_log_level is not None: _backend_logger.setLevel(_get_level(backend_log_level)) self.log.info( " Set backend logger's level to '%s'.", backend_log_level )
[docs] def _setup_rng( self, *, seed: int, seed_numpy_rng: Optional[Union[bool, int]] = None, seed_system_rng: Optional[Union[bool, int]] = None, **rng_kwargs, ) -> "numpy.random.Generator": """Sets up the shared RNG. .. note:: If also seeding the other RNGs, make sure to use different seeds for them, such that random number sequences are ensured to be different even if the underlying generator may be the same. Args: seed (int): The seed for the new, model-specific RNG, constructed via :py:func:`numpy.random.default_rng` seed_numpy_rng (Optional[Union[bool, int]], optional): If not False or None, will also seed numpy's singleton (i.e. *default*) RNG by calling :py:func:`numpy.random.seed`. If True, will use ``seed + 1`` for that. seed_system_rng (Optional[Union[bool, int]], optional): If not False or None, will also seed the system's default RNG by calling :py:func:`random.seed`. If True, will use ``seed + 2`` for that. **rng_kwargs: Passed on to :py:func:`numpy.random.default_rng` """ self.log.info("Creating shared RNG (seed: %s) ...", seed) rng = np.random.default_rng(seed, **rng_kwargs) if seed_numpy_rng not in (None, False): if seed_numpy_rng is True: seed_numpy_rng = seed + 1 np.random.seed(seed_numpy_rng) self.log.debug( " Default numpy RNG seeded with: %s", seed_numpy_rng ) if seed_system_rng not in (None, False): if seed_system_rng is True: seed_system_rng = seed + 2 random.seed(seed_system_rng) self.log.debug( " Default system RNG seeded with: %s", seed_system_rng ) return rng
[docs] def _setup_output_file(self) -> "h5py.File": """Creates the output file for this model; by default, it is a HDF5 file that is managed by a :py:class:`h5py.File` object. .. note:: This method can be subclassed to implement different output file formats. In that case, consider not using the ``_h5file`` and ``_h5group`` attributes but something else. """ self.log.info( "Creating HDF5 output file at:\n %s\n", self.root_cfg["output_path"], ) return h5.File(self.root_cfg["output_path"], mode="x")
[docs] def _setup_output_group(self, h5file: "h5py.File" = None) -> "h5py.Group": """Creates the group that this model's output is written to""" if h5file is None: h5file = self._h5file return h5file.create_group(self.name)
# .. Invocation wrappers ..................................................
[docs] def _invoke_iterate(self): self.iterate()
[docs] def _invoke_write_data(self): self.write_data()
[docs] def _invoke_setup(self): self.log.info("Invoking model setup ...") self.setup(**self.cfg) self.log.info("Model setup finished.\n") self._setup_finished()
[docs] def _invoke_prolog(self): """Helps invoking the :py:meth:`.prolog`""" self.log.debug("Invoking prolog ...") self.prolog() self.log.debug("Prolog finished.\n")
[docs] def _invoke_epilog(self, **kwargs): """Helps invoking the :py:meth:`.epilog`""" self.log.debug("Invoking epilog ...") self.epilog(**kwargs) self.log.debug("Epilog finished.\n")