"""Implements batch running and evaluation of simulations"""
import logging
import os
import time
from copy import deepcopy as _deepcopy
from shutil import copy2 as _copy2
from typing import Callable, Dict, Sequence, Tuple, Union
from pkg_resources import resource_filename as _resource_filename
from .cfg import UTOPYA_CFG_FILE_PATHS as _UTOPYA_CFG_FILE_PATHS
from .cfg import load_from_cfg_dir as _load_from_cfg_dir
from .reporter import WorkerManagerReporter
from .task import MPProcessTask
from .tools import recursive_update as _recursive_update
from .workermanager import WorkerManager
from .yaml import load_yml as _load_yml
from .yaml import write_yml as _write_yml
_BTM_BASE_CFG_PATH = _resource_filename("utopya", "cfg/btm_cfg.yml")
"""Base configuration path of the batch task manager"""
_BTM_BASE_CFG = _load_yml(_BTM_BASE_CFG_PATH)
"""Actual base configuration of the batch task manager"""
_BTM_USER_DEFAULTS = _load_from_cfg_dir("batch")
"""User defaults for the batch task manager"""
_BTM_DEFAULTS = _recursive_update(
_deepcopy(_BTM_BASE_CFG), _deepcopy(_BTM_USER_DEFAULTS)
)
"""Aggregated and recursively updated default batch task manager config"""
INVALID_TASK_NAME_CHARS = (
"/",
":",
".",
"?",
"*",
)
"""Substrings that may not appear in task names"""
log = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# .. Definition of multiprocessing.Process target callables ...................
# These need to happen here because they need to be importable from the module
# and not be local function definitions.
[docs]def _eval_task(
*,
task_name: str,
_batch_name: str,
_batch_dirs: dict,
_task_cfg_path: str,
_create_symlinks: bool,
model_name: str,
model_kwargs: dict = {},
use_data_tree_cache: bool = None,
print_tree: Union[bool, str] = "condensed",
plot_only: Sequence[str] = None,
plots_cfg: str = None,
update_plots_cfg: dict = {},
**frozen_mv_kwargs,
):
"""The evaluation task target for the multiprocessing.Process. It sets up
a :py:class:`utopya.model.Model`, loads the data, and performs plots.
"""
import os
from .model import Model
log.hilight(
"Setting up evaluation task '%s' for model '%s' ...",
task_name,
model_name,
)
model = Model(name=model_name, **model_kwargs)
mv = model.create_frozen_mv(**frozen_mv_kwargs)
# Create symlinks to improve crosslinking between related files/directories
if _create_symlinks:
log.progress("Creating symlinks ...")
# ... back to the run directory, removing an existing one.
_dst = os.path.join(mv.dm.dirs["out"], "_run")
if os.path.islink(_dst):
os.remove(_dst)
os.symlink(mv.dm.dirs["data"], _dst)
log.note("...to original run directory.")
# ... from the evaluation output directory of the original run to the
# evaluation data directory of this batch eval task.
_dst = os.path.join(
mv.dm.dirs["data"], "eval", f"{_batch_name}_eval_{task_name}"
)
os.symlink(mv.dm.dirs["out"], _dst)
log.note(
"...from original run directory back to this evaluation "
"output directory."
)
# ... back to the task configuration file, removing an existing one.
_dst = os.path.join(mv.dm.dirs["out"], "_task_cfg.yml")
if os.path.islink(_dst):
os.remove(_dst)
os.symlink(_task_cfg_path, _dst)
log.note("...to batch task configuration.")
# With output directories that are *outside* of the batch run
# directory, add links back and forth between them:
if not mv.dm.dirs["out"].startswith(_batch_dirs["eval"]):
# ... back to the batch run directory
_dst = os.path.join(mv.dm.dirs["out"], "_batch_run")
if os.path.islink(_dst):
os.remove(_dst)
os.symlink(_batch_dirs["batch_run"], _dst)
log.note("...to batch run directory.")
# ... from default eval directory to custom one
_dst = os.path.join(_batch_dirs["eval"], task_name)
os.symlink(mv.dm.dirs["out"], _dst)
log.note(
"...from batch evaluation output directory to custom "
"output directory."
)
print("")
# Load the data tree, potentially restoring from tree cache
if not use_data_tree_cache:
mv.dm.load_from_cfg()
else:
if not mv.dm.tree_cache_exists:
mv.dm.load_from_cfg()
mv.dm.dump()
else:
log.hilight("Restoring tree from cache file ...")
mv.dm.restore()
# Done loading
if print_tree == "condensed":
print(mv.dm.tree_condensed)
elif print_tree:
print(mv.dm.tree)
# Start plotting ...
mv.pm.plot_from_cfg(
plots_cfg=plots_cfg, plot_only=plot_only, **update_plots_cfg
)
# -----------------------------------------------------------------------------
[docs]class BatchTaskManager:
"""A manager for batch tasks"""
RUN_DIR_TIME_FSTR = "%y%m%d-%H%M%S"
"""The time format string for the run directory"""
# .........................................................................
[docs] def __init__(self, *, batch_cfg_path: str = None, **update_batch_cfg):
"""Sets up a BatchTaskManager.
Args:
batch_cfg_path (str, optional): The batch file with all the task
definitions.
**update_batch_cfg: Additional arguments that are used to update
the batch configuration.
Raises:
NotImplementedError: If ``run_tasks`` or ``cluster_mode`` were set
in the batch configuration.
"""
log.progress("Initializing BatchTaskManager ...")
self._cfg = self._setup_batch_cfg(batch_cfg_path, **update_batch_cfg)
self._timestamp_str = time.strftime(self.RUN_DIR_TIME_FSTR)
self._dirs, self._name = self._setup_dirs(**self._cfg["paths"])
log.note("Batch run directory:\n %s", self.dirs["batch_run"])
self._perform_backup(
base_cfg=_BTM_BASE_CFG_PATH,
user_cfg=_UTOPYA_CFG_FILE_PATHS.get("batch"),
batch_file=batch_cfg_path,
update_cfg=update_batch_cfg,
batch_cfg=self._cfg,
)
# Some features are not yet implemented ...
if self._cfg["cluster_mode"]:
raise NotImplementedError("Cluster mode is not supported yet!")
# Set up the WorkerManager and its reporter
self._wm = WorkerManager(**self._cfg["worker_manager"])
self._reporter = WorkerManagerReporter(
self._wm,
report_dir=self.dirs["batch_run"],
**self._cfg["reporter"],
)
log.progress("Initialized BatchTaskManager.")
log.note(" Parallelization level: %s", self.parallelization_level)
log.note(" Debug mode? %s", self.debug)
# Register tasks
log.progress("Adding tasks ...")
self._n_run = self._add_tasks(
tasks=self._cfg["tasks"]["run"],
defaults=self.run_defaults,
add_task=self._add_run_task,
)
self._n_eval = self._add_tasks(
tasks=self._cfg["tasks"]["eval"],
defaults=self.eval_defaults,
add_task=self._add_eval_task,
)
log.success(
"Added %d run and %d evaluation tasks ...",
self._n_run,
self._n_eval,
)
def __str__(self) -> str:
return f"<BatchTaskManager '{self._name}'>"
# .........................................................................
@property
def debug(self) -> bool:
"""Whether debug mode was enabled."""
return self._cfg["debug"]
@property
def parallelization_level(self) -> str:
return self._cfg["parallelization_level"]
@property
def run_defaults(self) -> dict:
"""A deepcopy of the run task defaults"""
return _deepcopy(self._cfg["task_defaults"]["run"])
@property
def eval_defaults(self) -> dict:
"""A deepcopy of the eval task defaults"""
return _deepcopy(self._cfg["task_defaults"]["eval"])
@property
def dirs(self) -> dict:
"""The directories associated with this BatchTaskManager"""
return self._dirs
# .........................................................................
# .........................................................................
[docs] @staticmethod
def _setup_batch_cfg(batch_cfg_path: str, **update_batch_cfg) -> dict:
"""Sets up the BatchTaskManager configuration"""
# Update defaults with configuration from file, if given
batch_cfg = _load_yml(batch_cfg_path) if batch_cfg_path else {}
batch_cfg = _recursive_update(_deepcopy(_BTM_DEFAULTS), batch_cfg)
# Update again
batch_cfg = _recursive_update(batch_cfg, _deepcopy(update_batch_cfg))
# For debug mode, let the WorkerManager raise directly rather than only
# issuing warnings (default).
if batch_cfg["debug"]:
batch_cfg["worker_manager"]["nonzero_exit_handling"] = "raise"
# Evaluate parallelization level
plevel = batch_cfg["parallelization_level"]
if plevel == "task":
batch_cfg["worker_manager"]["num_workers"] = 1
batch_cfg["worker_kwargs"]["forward_streams"] = True
elif plevel == "batch":
task_defaults = batch_cfg["task_defaults"]
task_defaults["run"] = _recursive_update(
task_defaults["run"],
dict(
worker_manager=dict(num_workers=1),
parameter_space=dict(
parallel_execution=dict(enabled=False)
),
),
)
else:
raise ValueError(
f"Invalid parallelization_level '{plevel}'! "
"Valid options are: batch, task"
)
log.info("Loaded batch configuration.")
return batch_cfg
[docs] def _setup_dirs(
self, out_dir: str, note: str = None
) -> Tuple[Dict[str, str], str]:
"""Sets up directories"""
out_dir = os.path.expanduser(out_dir)
if not os.path.isabs(out_dir):
raise ValueError(
f"Batch output directory needs to be absolute! Was: {out_dir}"
)
# Batch run directory
fstr = "{timestamp:}"
if note:
fstr += "_{note:}"
batch_run_dir_name = fstr.format(
timestamp=self._timestamp_str, note=note
)
batch_run_dir = os.path.join(out_dir, batch_run_dir_name)
# Create batch run directory
dirs = dict(out=out_dir, batch_run=batch_run_dir)
os.makedirs(batch_run_dir)
# Subdirectories
subdirs = (
"config",
"config/tasks",
"eval",
"logs",
)
for subdir in subdirs:
dirs[subdir] = os.path.join(batch_run_dir, subdir)
os.makedirs(dirs[subdir])
return dirs, batch_run_dir_name
# .........................................................................
[docs] def _add_tasks(
self, tasks: dict, defaults: dict, add_task: Callable
) -> int:
"""Adds all configured run tasks to the WorkerManager's task queue"""
tasks = _deepcopy(tasks)
for task_name, task_cfg in tasks.items():
if any(s in task_name for s in INVALID_TASK_NAME_CHARS):
raise ValueError(
f"Invalid task name '{task_name}'! May not contain any of "
"the following characters or substrings: "
+ " ".join(INVALID_TASK_NAME_CHARS)
)
task_cfg = _recursive_update(_deepcopy(defaults), task_cfg)
add_task(task_name, **task_cfg)
return len(tasks)
[docs] def _add_run_task(self, name: str, **_):
"""Adds a single run task to the WorkerManager"""
raise NotImplementedError(
"Run tasks are not supported yet! Remove the corresponding "
"entries from the BatchTaskManager configuration."
)
[docs] def _add_eval_task(
self,
name: str,
*,
model_name: str,
out_dir: str,
enabled: bool = True,
priority: int = None,
create_symlinks: bool = False,
**eval_task_kwargs,
):
"""Adds a single evaluation task to the WorkerManager.
Args:
name (str): Name of this task
model_name (str): Model name; required in task, thus already
requiring it here.
out_dir (str): The path to the data output directory, i.e. the
directory where all plots will ned up in.
This may be a format string containing any of the following
keys: ``task_name``, ``model_name``, ``timestamp``,
``batch_name`` (combination of ``timestamp`` and the note).
Relative paths are evaluated relative to the ``eval`` batch
run directory.
enabled (bool, optional): If False, will *not* add this task.
priority (int, optional): Task priority; tasks with smaller value
will be picked first.
create_symlinks (bool, optional): Whether to create symlinks that
add crosslinks between related directories, e.g.: from the
output directory, link back to the task configuration; from the
evaluation output directory alongside the simulation data, link
to the batch output directory
**eval_task_kwargs: All further evaluation task arguments.
"""
def setup_eval_task(
worker_kwargs: dict,
model_name: str,
out_dir: str,
create_symlinks: bool,
eval_task_kwargs: dict,
) -> dict:
"""Run before the task starts; sets up all arguments for it ..."""
# Prepare the DataManager's output directory path
out_dir = out_dir.format(
task_name=name,
model_name=model_name,
timestamp=self._timestamp_str,
batch_name=self._name,
)
out_dir = os.path.expanduser(out_dir)
if not os.path.isabs(out_dir):
out_dir = os.path.join(self.dirs["eval"], out_dir)
# Update the task arguments accordingly, setting the DataManager's
# output directory and adding metadata
eval_task_kwargs = _recursive_update(
eval_task_kwargs,
dict(
_batch_name=self._name,
task_name=name,
model_name=model_name,
data_manager=dict(out_dir=out_dir),
),
)
# Backup the configuration, such that the task can create a symlink
# back to it. This needs to be done by the task, because the
# DataManager will create the output directory and will fail if it
# already exists (which is good).
task_cfg_path = os.path.join(
self.dirs["config/tasks"], f"eval_{name}.yml"
)
_write_yml(eval_task_kwargs, path=task_cfg_path)
eval_task_kwargs["_batch_dirs"] = self.dirs
eval_task_kwargs["_task_cfg_path"] = task_cfg_path
eval_task_kwargs["_create_symlinks"] = create_symlinks
# Generate a new worker_kwargs dict, carrying over the given ones
worker_kwargs = dict(
args=(_eval_task,),
read_stdout=True,
stdout_parser="yaml_dict",
**worker_kwargs,
)
worker_kwargs["popen_kwargs"]["kwargs"] = eval_task_kwargs
# If the streams are to be saved, save them to the logs output
# directory inside the batch directory.
if worker_kwargs.get("save_streams", True):
worker_kwargs["save_streams_to"] = os.path.join(
self.dirs["logs"], f"eval_{name}_" + "{name:}.log"
)
return worker_kwargs
if not enabled:
log.debug("Task '%s' was not enabled. Skipping.", name)
return
# Determine setup and worker kwargs
worker_kwargs = _deepcopy(self._cfg["worker_kwargs"])
setup_kwargs = dict(
model_name=model_name,
out_dir=out_dir,
eval_task_kwargs=eval_task_kwargs,
create_symlinks=create_symlinks,
)
# Add the task
self._wm.add_task(
TaskCls=MPProcessTask,
name=name,
priority=priority,
setup_func=setup_eval_task,
setup_kwargs=setup_kwargs,
worker_kwargs=worker_kwargs,
)