"""Benchmarking tools for Models"""
import logging
import time
from collections import OrderedDict
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import h5py as h5
from dantro.tools import format_time as _format_time
log = logging.getLogger(__name__)
DEFAULT_TIMERS_ONE_SHOT: Tuple[str, ...] = (
"init",
"setup",
"prolog",
"run",
"epilog",
"teardown",
"simulation",
)
"""Names of default one-shot timers in :py:class:`.ModelBenchmarkMixin`"""
DEFAULT_TIMERS_CUMULATIVE: Tuple[str, ...] = (
"model_iteration",
"monitor",
"write_data",
"full_iteration",
)
"""Names of default cumulative timers in :py:class:`.ModelBenchmarkMixin`"""
# -----------------------------------------------------------------------------
[docs]class Timer:
"""Implements a simple timer that can be paused and continued."""
_name: str
_one_shot: bool
_time_func: Callable = time.time
_running: bool
_latest: float
_elapsed: float
_finished: bool
def __init__(
self,
name: str,
*,
time_func: Callable = None,
one_shot: bool = False,
start: bool = False,
):
self._name = name
self._one_shot = one_shot
self.reset()
if time_func is not None:
self._time_func = time_func
if start:
self.start()
[docs] def _get_time(self) -> float:
return self._time_func()
[docs] def _assert_not_finished(self):
if self.finished:
raise RuntimeError(
f"Tried to update timer '{self}' that was already marked "
"as finished."
)
def __str__(self) -> str:
segments = []
segments.append(f"Timer '{self.name}'")
segments.append(f"{self.elapsed:.3g}s elapsed")
if self.running:
segments.append("running")
if self.finished:
segments.append("finished")
return f"<{', '.join(segments)}>"
[docs] def start(self):
self._assert_not_finished()
self._unpause()
[docs] def pause(self) -> float:
self._assert_not_finished()
if not self.running:
raise RuntimeError(f"Cannot pause already paused timer {self}!")
self._elapsed += self._get_time() - self._latest
self._latest = None
self._running = False
if self.one_shot:
self._finished = True
return self.elapsed
[docs] def unpause(self):
if self.one_shot:
raise RuntimeError(
f"{self} is a one-shot timer and cannot be unpaused!"
)
self._assert_not_finished()
self._unpause()
[docs] def _unpause(self):
self._latest = self._get_time()
self._running = True
[docs] def stop(self) -> float:
self._assert_not_finished()
if self.running:
self.pause()
self._finished = True
return self.elapsed
[docs] def reset(self):
self._latest = None
self._running = False
self._finished = False
self._elapsed = 0.0
@property
def name(self) -> str:
return self._name
@property
def running(self) -> bool:
return self._running
@property
def finished(self) -> bool:
return self._finished
@property
def elapsed(self) -> float:
if self._running:
return self._elapsed + (self._get_time() - self._latest)
return self._elapsed
@property
def one_shot(self) -> bool:
return self._one_shot
# -----------------------------------------------------------------------------
[docs]class ModelBenchmarkMixin:
"""A mixin class that allows to conveniently gather information on the run
time that individual parts of the model iteration require and also store it
in the model's dataset.
To use this, simply inherit it into your model class definition:
.. testcode::
from utopya_backend import BaseModel, ModelBenchmarkMixin
class MyModel(ModelBenchmarkMixin, BaseModel):
pass
By default, this will enable the benchmarking and will both show the
result at the end of the run as well as write it to a separate benchmarking
group in the default HDF5 data group.
To further configure its behaviour, add a ``benchmark`` entry to your
model's configuration. For available parameters and default values, refer
to :py:meth:`._configure_benchmark`.
"""
_timers: Dict[str, Timer]
_TIMER_FALLBACK_RV: Any = -1
"""The fallback value that is returned by :py:meth:`.pause_timer` and
:py:meth:`.stop_timer` when benchmarking is completely disabled.
"""
_dgrp_bench: Optional[h5.Group] = None
_dset_total: Optional[h5.Dataset] = None
_dset_cumulative: Optional[h5.Dataset] = None
__dgrp_name: str
__dset_dtype: str
__dset_compression: int
_dset_cumulative_invocation_times: List[int]
__enabled: bool = True
__write: bool = None
_show_on_exit: bool = True
_add_time_elapsed_to_monitor_info: bool = False
_time_elapsed_info_fstr: str
# TODO consider not having default values here
# .........................................................................
def __init__(self, *args, **kwargs):
# Start with default values and timers
self._timers = OrderedDict()
self._add_default_timers()
self.start_timer("simulation")
self.start_timer("init")
super().__init__(*args, **kwargs)
self.stop_timer("init")
# Have the configuration available only now, after init (running setup)
self._configure_benchmark(**self._bench_cfg)
# Find out if the class this is mixed-in to has a step-based iteration
# scheme, in which case some procedures may run differently.
self._is_stepwise_model = hasattr(self, "write_start") and hasattr(
self, "write_every"
)
if self.__enabled:
# Create the group that benchmark data will be written to
if self.__write:
self._dgrp_bench = self.h5group.create_group(self.__dgrp_name)
self.log.info("Model benchmarking set up.")
else:
self.log.debug("Model benchmarking disabled.")
[docs] def _add_default_timers(self):
self.add_one_shot_timers(*DEFAULT_TIMERS_ONE_SHOT)
self.add_cumulative_timers(*DEFAULT_TIMERS_CUMULATIVE)
# .. Adding timers ........................................................
[docs] def add_one_shot_timers(self, *names, **kwargs):
for name in names:
self._add_timer(name, one_shot=True, **kwargs)
[docs] def add_cumulative_timers(self, *names, **kwargs):
for name in names:
self._add_timer(name, one_shot=False, **kwargs)
[docs] def _add_timer(self, name, *, one_shot: bool, **kwargs):
self.timers[name] = Timer(name, one_shot=one_shot, **kwargs)
return self.timers[name]
# .. Controlling timers ...................................................
@property
def timers(self) -> Dict[str, Timer]:
return self._timers
[docs] def _get_timer(self, name: str) -> Timer:
try:
return self.timers[name]
except KeyError as err:
_avail = ", ".join(sorted(self.timers))
raise ValueError(
f"No benchmark timer named '{name}' was added!\n"
f"Available timers: {_avail}"
) from err
[docs] def start_timer(self, name: str) -> None:
if not self.__enabled:
return
self._get_timer(name).start()
[docs] def pause_timer(self, name: str) -> Union[float, Any]:
if not self.__enabled:
return self._TIMER_FALLBACK_RV
return self._get_timer(name).pause()
[docs] def unpause_timer(self, name: str) -> None:
if not self.__enabled:
return
self._get_timer(name).unpause()
[docs] def stop_timer(self, name: str) -> Union[float, Any]:
if not self.__enabled:
return self._TIMER_FALLBACK_RV
return self._get_timer(name).stop()
# .. Retrieving timer data ................................................
@property
def elapsed(self) -> Dict[str, float]:
return {k: t.elapsed for k, t in self._timers.items()}
@property
def elapsed_cumulative(self) -> Dict[str, float]:
return {
k: t.elapsed for k, t in self._timers.items() if not t.one_shot
}
@property
def elapsed_one_shot(self) -> Dict[str, float]:
return {k: t.elapsed for k, t in self._timers.items() if t.one_shot}
@property
def elapsed_info(self) -> str:
"""Prepares a formatted string with all elapsed times"""
return "\n".join(
self._time_elapsed_info_fstr.format(
name=name,
seconds=seconds,
time_str=_format_time(seconds, ms_precision=2),
)
for name, seconds in self.elapsed.items()
)
# .. Storing timer data ...................................................
[docs] def _write_dset_total(self):
if not self.__enabled or not self.__write:
return
elapsed = self.elapsed
N = len(elapsed)
# May still need to create the dataset
if self._dset_total is None:
ds = self._dgrp_bench.create_dataset(
"total",
(N,),
maxshape=(N,),
chunks=True,
compression=self.__dset_compression,
dtype=self.__dset_dtype,
)
ds.attrs["dim_names"] = ["label"]
ds.attrs["coords_mode__label"] = "values"
ds.attrs["coords__label"] = list(elapsed.keys())
self._dset_total = ds
# TODO check what happens if invoked repeatedly, possibly with new
# timers added in between
self._dset_total[:] = list(elapsed.values())
[docs] def _write_dset_cumulative(self):
if not self.__enabled or not self.__write:
return
elapsed_cumulative = self.elapsed_cumulative
N = len(elapsed_cumulative)
# May still need to create it
if self._dset_cumulative is None:
ds = self._dgrp_bench.create_dataset(
"cumulative",
(0, N),
maxshape=(None, N),
chunks=True,
compression=self.__dset_compression,
dtype=self.__dset_dtype,
)
ds.attrs["dim_names"] = ["n_iterations", "label"]
if not self._is_stepwise_model:
# As constantly updating write times attribute would be too
# costly, denote the times as trivial indices for now and
# later update that attribute (at the very end) using the list
# containing invocation times (in number of iterations) that
# is built up meanwhile.
ds.attrs["coords_mode__n_iterations"] = "trivial"
else:
ds.attrs["coords_mode__n_iterations"] = "start_and_step"
_sas = [self.write_start, self.write_every]
ds.attrs["coords__n_iterations"] = _sas
ds.attrs["coords_mode__label"] = "values"
ds.attrs["coords__label"] = list(elapsed_cumulative.keys())
self._dset_cumulative = ds
self._dset_cumulative_invocation_times = []
# May need to expand size along time dimension
ds = self._dset_cumulative
ds.resize(ds.shape[0] + 1, axis=0)
# Now write:
ds[-1, :] = list(elapsed_cumulative.values())
# Extend list of write times (to be written to attribute at the end
# of the run)
if not self._is_stepwise_model:
self._dset_cumulative_invocation_times.append(self.n_iterations)
# .. Inject into simulation procedure .....................................
# Note that __init__ also contains a timer
[docs] def _invoke_setup(self):
self.start_timer("setup")
self._bench_cfg = self.cfg.pop("benchmark", {})
super()._invoke_setup()
self.stop_timer("setup")
[docs] def _pre_run(self):
self.start_timer("run")
super()._pre_run()
[docs] def _post_run(self, *, finished_run: bool):
super()._post_run(finished_run=finished_run)
# Stop all remaining timers
self.stop_timer("run")
for timer in self.timers.values():
if timer.finished or timer.name == "teardown":
continue
self.stop_timer(timer.name)
# Write total values for all timers
self._write_dset_total()
# Ensure that coordinate labels for n_iterations are stored
if (
self.__enabled
and self.__write
and self._dset_cumulative is not None
):
ds = self._dset_cumulative
times = self._dset_cumulative_invocation_times
ds.attrs["coords_mode__n_iterations"] = "values"
ds.attrs["coords__n_iterations"] = times
# Show times
if self.__enabled and self._show_on_exit:
self.log.info(
"Elapsed times for parts of this simulation:\n\n%s\n",
self.elapsed_info,
)
[docs] def _invoke_prolog(self):
self.start_timer("prolog")
super()._invoke_prolog()
self.stop_timer("prolog")
[docs] def _pre_iterate(self):
self.unpause_timer("full_iteration")
super()._pre_iterate()
[docs] def _invoke_iterate(self):
self.unpause_timer("model_iteration")
super()._invoke_iterate()
self.pause_timer("model_iteration")
[docs] def _pre_monitor(self):
self.unpause_timer("monitor")
super()._pre_monitor()
[docs] def _emit_monitor(self):
if self._add_time_elapsed_to_monitor_info:
self._monitor_info["timers"] = self.elapsed
super()._emit_monitor()
[docs] def _post_monitor(self):
super()._post_monitor()
self.pause_timer("monitor")
[docs] def _invoke_write_data(self):
self.unpause_timer("write_data")
super()._invoke_write_data()
self._write_dset_cumulative()
self.pause_timer("write_data")
[docs] def _post_iterate(self):
super()._post_iterate()
self.pause_timer("full_iteration")
[docs] def _invoke_epilog(self, **kwargs):
self.start_timer("epilog")
super()._invoke_epilog(**kwargs)
self.stop_timer("epilog")
def __del__(self):
self.start_timer("teardown")
super().__del__()
self.stop_timer("teardown")