"""Implementation of the :py:class:`~utopya.multiverse.Multiverse` class which
sits at the heart of utopya and supplies the main user interface for the
frontend. It allows to run a simulation and then evaluate it.
"""
import copy
import itertools
import logging
import os
import re
import time
import warnings
from collections import defaultdict
from shutil import copy2
from tempfile import TemporaryDirectory
from typing import List, Tuple, Union
import paramspace as psp
from pkg_resources import resource_filename
from ._cluster import parse_node_list
from .cfg import get_cfg_path as _get_cfg_path
from .eval import DataManager, PlotManager
from .model_registry import ModelInfoBundle, get_info_bundle, load_model_cfg
from .parameter import ValidationError
from .project_registry import PROJECTS
from .reporter import WorkerManagerReporter
from .task import NoWorkTask, WorkerTask
from .tools import parse_num_steps, pformat, recursive_update
from .workermanager import WorkerManager
from .yaml import load_yml, write_yml
log = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
[docs]class Multiverse:
"""The Multiverse is where a single simulation run is orchestrated from.
It spawns multiple universes, each of which represents a single simulation
of the selected model with the parameters specified by the meta
configuration.
The :py:class:`~utopya.workermanager.WorkerManager` takes care to perform
these simulations in parallel.
The :py:class:`.Multiverse` then interfaces with the :py:mod:`dantro` data
processing pipeline using classes specialized in :py:mod:`utopya.eval`:
The :py:class:`~utopya.eval.datamanager.DataManager` loads the created
simulation output, making it available in a uniformly accessible and
hierarchical data tree.
Then, the :py:class:`~utopya.eval.plotmanager.PlotManager` handles
plotting of that data.
"""
BASE_META_CFG_PATH = resource_filename("utopya", "cfg/base_cfg.yml")
"""Where the default meta configuration can be found"""
USER_CFG_SEARCH_PATH = _get_cfg_path("user")
"""Where to look for the user configuration"""
RUN_DIR_TIME_FSTR = "%y%m%d-%H%M%S"
"""The time format string for the run directory"""
UTOPYA_BASE_PLOTS_PATH = resource_filename("utopya", "cfg/base_plots.yml")
"""Where the utopya base plots configuration can be found; this is passed
to the :py:class:`~utopya.eval.plotmanager.PlotManager`.
"""
[docs] def __init__(
self,
*,
model_name: str = None,
info_bundle: ModelInfoBundle = None,
run_cfg_path: str = None,
user_cfg_path: str = None,
_shared_worker_manager: WorkerManager = None,
**update_meta_cfg,
):
"""Initialize the Multiverse.
Args:
model_name (str, optional): The name of the model to run
info_bundle (ModelInfoBundle, optional): The model information
bundle that includes information about the executable path etc.
If not given, will attempt to read it from the model registry.
run_cfg_path (str, optional): The path to the run configuration.
user_cfg_path (str, optional): If given, this is used to update the
base configuration. If None, will look for it in the default
path, see Multiverse.USER_CFG_SEARCH_PATH.
_shared_worker_manager (WorkerManager, optional): If given, this
already existing WorkerManager instance (and its reporter)
will be used instead of initializing new instances.
.. warning::
This argument is only exposed for internal purposes.
It should not be used for production code and behavior of
this argument may change at any time.
**update_meta_cfg: Can be used to update the meta configuration
generated from the previous configuration levels
"""
# First things first: get the info bundle
if info_bundle is None:
info_bundle = get_info_bundle(
model_name=model_name, info_bundle=info_bundle
)
self._info_bundle = info_bundle
log.progress(
"Initializing Multiverse for '%s' model ...", self.model_name
)
# Setup property-managed attributes
self._dirs = dict()
self._model_executable = None
self._tmpdir = None
self._resolved_cluster_params = None
# Create meta configuration and list of used config files
mcfg, cfg_parts = self._create_meta_cfg(
run_cfg_path=run_cfg_path,
user_cfg_path=user_cfg_path,
update_meta_cfg=update_meta_cfg,
)
self._meta_cfg = mcfg
log.info("Built meta configuration.")
log.remark(" Debug level: %d", self.debug_level)
self._apply_debug_level()
# In cluster mode, need to make some adjustments via additional dicts
dm_cluster_kwargs = dict()
wm_cluster_kwargs = dict()
if self.cluster_mode:
log.note("Cluster mode enabled.")
self._resolved_cluster_params = self._resolve_cluster_params()
rcps = self.resolved_cluster_params # creates a deep copy
log.note(
"This is node %d of %d.",
rcps["node_index"] + 1,
rcps["num_nodes"],
)
# Changes to the meta configuration
# To avoid config file collisions in the PlotManager:
self._meta_cfg["plot_manager"]["cfg_exists_action"] = "skip"
# _Additional_ arguments to pass to *Manager initializations below
# ... for DataManager
timestamp = rcps["timestamp"]
dm_cluster_kwargs = dict(
out_dir_kwargs=dict(timestamp=timestamp, exist_ok=True)
)
# ... for WorkerManager
wm_cluster_kwargs = dict(
cluster_mode=True, resolved_cluster_params=rcps
)
# Create the run directory and write the meta configuration into it.
self._create_run_dir(**self.meta_cfg["paths"])
log.note("Run directory:\n %s", self.dirs["run"])
# Backup involved files, if not in cluster mode or on the relevant node
if (
not self.cluster_mode
or self.resolved_cluster_params["node_index"] == 0
):
# If not in cluster mode, should backup in any case.
# In cluster mode, the first node is responsible for backing up
# the configuration; all others can relax.
self._perform_backup(
**self.meta_cfg["backups"], cfg_parts=cfg_parts
)
else:
log.debug(
"Not backing up config files, because it was already "
"taken care of by the first node."
)
# NOTE Not taking a try-except approach here because it might get
# messy when multiple nodes try to backup the configuration
# at the same time ...
# Validate the parameters specified in the meta configuration
self._validate_meta_cfg()
# Prepare the executable
self._prepare_executable(**self.meta_cfg["executable_control"])
# Create a DataManager instance
self._dm = DataManager(
self.dirs["run"],
name=f"{self.model_name}_data",
**self.meta_cfg["data_manager"],
**dm_cluster_kwargs,
)
log.progress("Initialized DataManager.")
# Either create a WorkerManager instance and its associated reporter
# or use an already existing WorkerManager that is also used elsewhere
if not _shared_worker_manager:
self._wm = WorkerManager(
**self.meta_cfg["worker_manager"], **wm_cluster_kwargs
)
self._reporter = WorkerManagerReporter(
self.wm,
mv=self,
report_dir=self.dirs["run"],
**self.meta_cfg["reporter"],
)
else:
self._wm = _shared_worker_manager
self._reporter = self.wm.reporter
log.info("Using a shared WorkerManager instance and reporter.")
# And instantiate the PlotManager with the model-specific plot config
self._pm = self._setup_pm()
log.progress("Initialized Multiverse.\n")
# Properties ..............................................................
@property
def debug_level(self) -> int:
"""The debug level"""
return self.meta_cfg.get("debug_level", 0)
@property
def info_bundle(self) -> ModelInfoBundle:
"""The model info bundle for this Multiverse"""
return self._info_bundle
@property
def model_name(self) -> str:
"""The model name associated with this Multiverse"""
return self.info_bundle.model_name
@property
def model_executable(self) -> str:
"""The path to the model executable"""
if self._model_executable is not None:
# Use the executable from a temporary directory
return self._model_executable
execpath = self.info_bundle.executable
if not execpath:
raise ValueError(
f"Model '{self.model_name}' does not have an executable "
"registered that can be used to perform a simulation run!\n"
"If you want to run simulations, specify an executable. "
"In case you only want to use utopya's evaluation routines, "
"no executable is needed, but only the evaluation pipeline is "
"available and it seems like you tried to run a simulation."
)
return execpath
@property
def model(self) -> "utopya.model.Model":
"""A model instance, created ad-hoc using the associated info bundle"""
from .model import Model
return Model(info_bundle=self.info_bundle)
@property
def meta_cfg(self) -> dict:
"""The meta configuration."""
return self._meta_cfg
@property
def dirs(self) -> dict:
"""Information on managed directories."""
return self._dirs
@property
def cluster_mode(self) -> bool:
"""Whether the Multiverse should run in cluster mode"""
return self.meta_cfg["cluster_mode"]
@property
def cluster_params(self) -> dict:
"""Returns a copy of the cluster mode configuration parameters"""
return copy.deepcopy(self.meta_cfg["cluster_params"])
@property
def resolved_cluster_params(self) -> dict:
"""Returns a copy of the cluster configuration with all parameters
resolved. This makes some additional keys available on the top level.
"""
# Return the cached value as a _copy_ to secure it against changes
return copy.deepcopy(self._resolved_cluster_params)
@property
def dm(self) -> DataManager:
"""The Multiverse's DataManager."""
return self._dm
@property
def wm(self) -> WorkerManager:
"""The Multiverse's WorkerManager."""
return self._wm
@property
def pm(self) -> PlotManager:
"""The Multiverse's PlotManager."""
return self._pm
# Public methods ..........................................................
[docs] def run(self, *, sweep: bool = None):
"""Starts a simulation run.
Specifically, this method adds simulation tasks to the associated
WorkerManager, locks its task list, and then invokes the
:py:meth:`~utopya.workermanager.WorkerManager.start_working` method
which performs all the simulation tasks.
If cluster mode is enabled, this will split up the parameter space into
(ideally) equally sized parts and only run one of these parts,
depending on the cluster node this Multiverse is being invoked on.
.. note::
As this method locks the task list of the
:py:class:`~utopya.workermanager.WorkerManager`, no further tasks
can be added henceforth. This means, that each Multiverse instance
can only perform a single simulation run.
Args:
sweep (bool, optional): Whether to perform a sweep or not. If None,
the value will be read from the ``perform_sweep`` key of the
meta-configuration.
"""
log.info("Preparing for simulation run ...")
# Add tasks, then prevent adding further tasks to disallow further runs
self._add_sim_tasks(sweep=sweep)
self.wm.tasks.lock()
# Tell the WorkerManager to start working (is a blocking call)
self.wm.start_working(**self.meta_cfg["run_kwargs"])
# Done! :)
log.success("Finished simulation run. Wohoo. :)\n")
[docs] def run_single(self):
"""Runs a single simulation using the parameter space's default value.
See :py:meth:`~utopya.multiverse.Multiverse.run` for more information.
"""
return self.run(sweep=False)
[docs] def run_sweep(self):
"""Runs a parameter sweep.
See :py:meth:`~utopya.multiverse.Multiverse.run` for more information.
"""
return self.run(sweep=True)
[docs] def renew_plot_manager(self, **update_kwargs):
"""Tries to set up a new PlotManager. If this succeeds, the old one is
discarded and the new one is associated with this Multiverse.
Args:
**update_kwargs: Passed on to PlotManager.__init__
"""
try:
pm = self._setup_pm(**update_kwargs)
except Exception as exc:
raise ValueError(
"Failed setting up a new PlotManager! "
"The old PlotManager remains."
) from exc
self._pm = pm
# Helpers .................................................................
[docs] def _apply_debug_level(self, lvl: int = None):
"""Depending on the debug level, applies certain settings to the
Multiverse and the runtime environment.
.. note::
This does *not* (yet) set the corresponding debug flags for the
``PlotManager``, ``DataManager``, or ``WorkerManager``!
"""
lvl = lvl if lvl is not None else self.debug_level
if lvl >= 2:
warnings.simplefilter("always", DeprecationWarning)
[docs] def _create_run_dir(
self,
*,
out_dir: str,
model_note: str = None,
dir_permissions: dict = None,
) -> None:
"""Create the folder structure for the run output.
For the chosen model name and current timestamp, the run directory
will be of form <timestamp>_<model_note> and be part of the following
directory tree:
::
utopya_output
model_a
180301-125410_my_model_note
config
data
uni000
uni001
...
eval
model_b
180301-125412_my_first_sim
180301-125413_my_second_sim
If running in cluster mode, the cluster parameters are resolved and
used to determine the name of the simulation. The pattern then does not
include a timestamp as each node might return not quite the same value.
Instead, a value from an environment variable is used.
The resulting path can have different forms, depending on which
environment variables were present; required parts are denoted by a
``*`` in the following pattern; if the value of the other entries is
not available, the connecting underscore will not be used:
::
{timestamp}_{job id*}_{cluster}_{job account}_{job name}_{note}
Args:
out_dir (str): The base output directory, where all Utopia output
is stored.
model_note (str, optional): The note to add to the run directory
of the current run.
dir_permissions (Dict[str, str]): If given, will set directory
permissions on the specified managed directories of this
Multiverse. The keys of this dict should be entries of the
:py:attr:`.dirs` attribute, values should be octal permissions
values given as a string.
Raises:
RuntimeError: If the simulation directory already existed. This
should not occur, as the timestamp is unique. If it occurs,
you either started two simulations very close to each other or
something is seriously wrong. Strange time zone perhaps?
"""
# Define a list of format string parts, starting with timestamp
fstr_parts = ["{timestamp:}"]
# Add respective information, depending on mode
if not self.cluster_mode:
# Available information is only the timestamp and the model note
fstr_kwargs = dict(
timestamp=time.strftime(self.RUN_DIR_TIME_FSTR),
model_note=model_note,
)
else:
# In cluster mode, need to resolve cluster parameters first
rcps = self.resolved_cluster_params
# Now, gather all information for the format string that will
# determine the name of the output directory. Make all the info
# available that was supplied from environment variables
fstr_kwargs = {
k: v for k, v in rcps.items() if k not in ("custom_out_dir",)
}
# Parse timestamp and model note separately
timestr = time.strftime(
self.RUN_DIR_TIME_FSTR, time.gmtime(rcps["timestamp"])
)
fstr_kwargs["timestamp"] = timestr # overwrites existing
fstr_kwargs["model_note"] = model_note # may be None
# Add the additional run dir format string parts; its the user's
# responsibility to supply something reasonable here.
if self.cluster_params.get("additional_run_dir_fstrs"):
fstr_parts += self.cluster_params["additional_run_dir_fstrs"]
# Now, also allow a custom output directory
if rcps.get("custom_out_dir"):
out_dir = rcps["custom_out_dir"]
# Have the model note as suffix
if model_note:
fstr_parts += ["{model_note:}"]
# fstr_parts and fstr_kwargs ready now. Carry out the format operation.
fstr = "_".join(fstr_parts)
run_dir_name = fstr.format(**fstr_kwargs)
log.debug("Determined run directory name: %s", run_dir_name)
# Parse the output directory, then build the run directory path
log.debug("Creating path for run directory inside %s ...", out_dir)
out_dir = os.path.expanduser(str(out_dir))
run_dir = os.path.join(out_dir, self.model_name, run_dir_name)
log.debug("Built run directory path: %s", run_dir)
self.dirs["run"] = run_dir
# ... and create it. In cluster mode, it may already exist.
try:
os.makedirs(run_dir, exist_ok=self.cluster_mode)
except OSError as err:
raise RuntimeError(
"Simulation directory already exists. This "
"should not have happened and is probably due "
"to two simulations having been started at "
"almost the same time. Try to start the "
"simulation again or add a unique model note."
) from err
log.debug("Created run directory.")
# Create the subfolders that are always assumed to be present
for subdir in ("config", "data", "eval"):
subdir_path = os.path.join(run_dir, subdir)
os.makedirs(subdir_path, exist_ok=self.cluster_mode)
self.dirs[subdir] = subdir_path
log.debug("Created subdirectories: %s", self._dirs)
# May want to adapt directory permissions
if not dir_permissions:
return
for dirname, mode in dir_permissions.items():
if mode is None:
continue
mode = int(str(mode), 8)
log.debug(
"Setting permissions on %s directory to %s ...",
dirname,
oct(mode),
)
os.chmod(self.dirs[dirname], mode)
[docs] def _get_run_dir(self, *, out_dir: str, run_dir: str, **__):
"""Helper function to find the run directory from arguments given
to :py:meth:`~utopya.multiverse.Multiverse.__init__`.
This is not actually used in :py:class:`~utopya.multiverse.Multiverse`
but in :py:class:`~utopya.multiverse.FrozenMultiverse` and
:py:class:`~utopya.multiverse.DistributedMultiverse`.
Args:
out_dir (str): The output directory
run_dir (str): The run directory to use
``**__``: ignored
Raises:
IOError: No directory found to use as run directory
TypeError: When run_dir was not a string
"""
PATTERN = r"\d{6}-\d{6}_?.*"
# Create model directory path (where the to-be-loaded data is expected)
out_dir = os.path.expanduser(str(out_dir))
model_dir = os.path.join(out_dir, self.model_name)
if not os.path.isdir(model_dir):
# Just create it, there's no harm in that ...
os.makedirs(model_dir)
# Distinguish different types of values for the run_dir argument
if run_dir is None:
log.info("Trying to identify the most recent run directory ...")
# Create list of _directories_ matching timestamp pattern
dirs = [
d
for d in sorted(os.listdir(model_dir))
if os.path.isdir(os.path.join(model_dir, d))
and re.match(PATTERN, os.path.basename(d))
]
if not dirs:
raise FileNotFoundError(
"Could not find a run directory to load for evaluation "
f"of model '{self.model_name}'!\n"
f"Model output directory: {model_dir}\n\n"
"Did you perform a simulation yet? If you are using this "
"model only for evaluation, place your data in a new "
"subdirectory in that folder (using timestamp as name)."
)
# Use the latest to choose the run directory
run_dir = os.path.join(model_dir, dirs[-1])
elif isinstance(run_dir, str):
run_dir = os.path.expanduser(run_dir)
# Distinguish absolute and relative paths and those starting with
# a timestamp-like pattern, which can be looked up from the model
# directory.
if os.path.isabs(run_dir):
log.debug("Received absolute run_dir, using that one.")
elif re.match(PATTERN, run_dir):
# Looks like a relative path within the model directory
log.info(
"Received timestamp '%s' for run_dir; trying to find "
"one within the model output directory ...",
run_dir,
)
run_dir = os.path.join(model_dir, run_dir)
else:
# Is not an absolute path and not a timestamp; thus a path
# relative to the current working directory
run_dir = os.path.join(os.getcwd(), run_dir)
else:
raise TypeError(
"Argument run_dir needs to be None, an absolute "
"path, or a path relative to the model output "
f"directory, but it was: {run_dir}"
)
# Check if the directory exists
if not os.path.isdir(run_dir):
raise OSError(f"No run directory found at '{run_dir}'!")
# Store the path and associate the subdirectories
self.dirs["run"] = run_dir
for subdir in ("config", "eval", "data"):
subdir_path = os.path.join(run_dir, subdir)
if not os.path.exists(subdir_path):
raise FileNotFoundError(
"Missing '%s' subdirectory in `run_dir` %s!",
subdir,
run_dir,
)
self.dirs[subdir] = subdir_path
[docs] def _setup_pm(self, **update_kwargs) -> PlotManager:
"""Helper function to setup a PlotManager instance"""
pm_kwargs = copy.deepcopy(self.meta_cfg["plot_manager"])
if update_kwargs:
pm_kwargs = recursive_update(pm_kwargs, update_kwargs)
base_cfg_pools = pm_kwargs.pop(
"base_cfg_pools", ["utopya_base", "model_base"]
)
log.info("Initializing PlotManager ...")
pm = PlotManager(
dm=self.dm,
_model_info_bundle=self.info_bundle,
default_plots_cfg=self.info_bundle.paths.get("default_plots"),
base_cfg_pools=self._parse_base_cfg_pools(base_cfg_pools),
**pm_kwargs,
)
log.progress("Initialized PlotManager.")
log.note(
"Available base plot configuration pools:\n %s",
", ".join(pm.base_cfg_pools.keys()),
)
log.note(
"Output directory: %s",
pm._out_dir if pm._out_dir else "\n " + self.dm.dirs["out"],
)
return pm
[docs] def _parse_base_cfg_pools(
self, base_cfg_pools: List[Union[str, Tuple[str, Union[str, dict]]]]
) -> List[Tuple[str, Union[str, dict]]]:
"""Prepares the ``base_cfg_pools`` argument to be valid input to the
PlotManager. This method resolves format strings and thus allows to
more generically define base config pools.
Possible formats for each entry of ``base_cfg_pools`` argument are:
- A 2-tuple ``(name, pool dict)`` which specifies the name of the
base config pool alongside with the pool entries.
- A 2-tuple ``(name, path to pool config file)``, which is later
loaded by the PlotManager
- A shortcut key which resolves to the corresponding 2-tuple.
Available shortcuts are: ``utopya_base``, ``framework_base``,
``project_base``, and ``model_base``.
Both the pool name and path may be format strings which get resolved
with the ``model_name`` key and (in the case of the path) the full
``paths`` dict of the current model's info bundle. A format string may
look like this:
"{paths[source_dir]}/{model_name}_more_plots.yml"
"~/some/more/plots/{model_name}/plots.yml"
If such a path cannot be resolved, an error is logged and an empty pool
is used instead; this allows for more flexibility in defining locations
for additional config pools.
Args:
base_cfg_pools (List[Union[str, Tuple[str, Union[str, dict]]]]):
The unparsed specification of config pools.
"""
def parse_entry(
entry: Union[str, list, tuple], replacements: dict
) -> Tuple[str, Union[str, dict]]:
"""Unpacks an entry into (name, pool) format and resolves any
remaining format specifiers in the name or pool path.
"""
if isinstance(entry, str):
try:
pool_name, pool = replacements[entry]
except KeyError as err:
_avail = ", ".join(replacements)
raise ValueError(
f"Invalid base config pool shortcut key '{entry}'! "
f"Available shortcuts are: {_avail}. "
"Use one of those or specify the config pool as a "
"2-tuple in form (name, path to pool)."
) from err
else:
pool_name, pool = entry
# Parse pool name and the path to the pool config file
pool_name = pool_name.format(model_name=self.model_name)
if isinstance(pool, str):
pool = pool.format(model_name=self.model_name, paths=paths)
pool = os.path.abspath(os.path.expanduser(pool))
if not os.path.isfile(pool):
log.error(
"No base plot config pool file found at:\n %s", pool
)
log.caution("Using an empty pool instead.")
pool = {}
# Make sure it is either a dict or a string, not a Path-like object
if not isinstance(pool, dict):
pool = str(pool)
return pool_name, pool
if not isinstance(base_cfg_pools, list):
raise TypeError(
"Base config pools need to be specified as a list of "
f"2-tuples or strings, got {type(base_cfg_pools)}!"
)
paths = self.info_bundle.paths
replacements = dict(
utopya_base=("utopya", self.UTOPYA_BASE_PLOTS_PATH),
framework_base=("framework", {}),
project_base=("project", {}),
model_base=("{model_name:}_base", paths.get("base_plots", {})),
)
project = self.info_bundle.project
if project:
fw_name = project.get("framework_name")
if fw_name:
fw_project = PROJECTS[fw_name]
replacements["framework_base"] = (
"framework",
fw_project["paths"].get("project_base_plots"),
)
# Only add a project-level replacement if it is different from the
# framework-level replacement
if self.info_bundle.project_name != fw_name and project[
"paths"
].get("project_base_plots"):
replacements["project_base"] = (
"project",
project["paths"].get("project_base_plots"),
)
return [parse_entry(p, replacements) for p in base_cfg_pools]
[docs] def _prepare_executable(self, *, run_from_tmpdir: bool = False) -> None:
"""Prepares the model executable, potentially copying it to a temporary
location.
Note that ``run_from_tmpdir`` requires the executable to be relocatable
to another location, i.e. be position-independent.
Args:
run_from_tmpdir (bool, optional): Whether to copy the executable
to a temporary directory that goes out of scope once the
Multiverse instance goes out of scope.
Raises:
FileNotFoundError: On missing file at model binary path
PermissionError: On wrong access rights of file at the binary path
"""
execpath = self.model_executable
# Make sure it exists and is executable
if not os.path.isfile(execpath):
raise FileNotFoundError(
"No file found at the specified executable "
f"path for model '{self.model_name}'! "
"If your model needs building, did you build it?\n"
f"Expected file at: {execpath}"
)
elif not os.access(execpath, os.X_OK):
raise PermissionError(
f"The specified executable path for model '{self.model_name}' "
"does not point to an executable file. Did you set the "
f"correct access rights?\n"
"Use the chmod command to mark the file as executable:\n\n"
f" chmod +x {execpath}\n"
)
if run_from_tmpdir:
self._tmpdir = TemporaryDirectory(prefix=self.model_name)
tmp_execpath = os.path.join(
self._tmpdir.name, os.path.basename(execpath)
)
log.info("Copying executable to temporary directory ...")
log.debug(" Original: %s", execpath)
log.debug(" Temporary: %s", tmp_execpath)
copy2(execpath, tmp_execpath)
execpath = tmp_execpath
self._model_executable = execpath
[docs] def _resolve_cluster_params(self) -> dict: # TODO Outsource!
"""This resolves the cluster parameters, e.g. by setting parameters
depending on certain environment variables. This function is called by
the resolved_cluster_params property.
Returns:
dict: The resolved cluster configuration parameters
Raises:
ValueError: If a required environment variable was missing or empty
"""
log.debug("Resolving cluster parameters from environment ...")
# Get a copy of the meta configuration parameters
cps = self.cluster_params
# Determine the environment to use; defaults to os.environ
env = cps.get("env") if cps.get("env") else dict(os.environ)
# Get the mapping of environment variables to target variables
mngr = cps["manager"]
var_map = cps["env_var_names"][mngr]
# Resolve the variables from the environment, requiring them to not
# be empty
resolved = {
target_key: env.get(var_name)
for target_key, var_name in var_map.items()
if env.get(var_name)
}
# Check that all required keys are available
required = ("job_id", "num_nodes", "node_list", "node_name")
if any([var not in resolved for var in required]):
_missing = ", ".join([k for k in required if k not in resolved])
raise ValueError(
f"Missing required environment variable(s): {_missing} ! "
"Make sure that the corresponding environment variables are "
"set and that the mapping is correct!\n"
f" Mapping for manager '{mngr}':\n{pformat(var_map)}\n\n"
f" Full environment:\n{pformat(env)}\n\n"
)
# Now do some postprocessing on some of the values
# Ensure integers
resolved["job_id"] = int(resolved["job_id"])
resolved["num_nodes"] = int(resolved["num_nodes"])
if "num_procs" in resolved:
resolved["num_procs"] = int(resolved["num_procs"])
if "timestamp" in resolved:
resolved["timestamp"] = int(resolved["timestamp"])
# Ensure reproducible node list format: ordered list
parse_mode = self.cluster_params["node_list_parser_params"][mngr]
try:
node_list = parse_node_list(
resolved["node_list"], mode=parse_mode, rcps=resolved
)
except Exception as exc:
raise ValueError(
f"Failed parsing node list {resolved['node_list']} into a "
f"uniform format using parsing mode '{parse_mode}' and "
f"cluster manager '{mngr}'! "
"Check the cluster mode configuration, the relevant "
"environment variables, and the chained error message.\n"
f"Cluster parameters:\n{pformat(self.cluster_params)}\n\n"
f"Parameters resolved so far:\n{pformat(resolved)}"
) from exc
resolved["node_list"] = node_list
# Calculated values, needed in Multiverse.run
# node_index: the offset in the modulo operation
resolved["node_index"] = node_list.index(resolved["node_name"])
# Return the resolved values
log.debug("Resolved cluster parameters:\n%s", pformat(resolved))
return resolved
[docs] def _setup_universe(
self,
*,
worker_kwargs: dict,
model_name: str,
model_executable: str,
uni_cfg: dict,
uni_basename: str,
) -> dict:
"""Setup function for individual universes.
This is called before the worker process starts working on the
universe.
Args:
worker_kwargs (dict): the current status of the worker_kwargs
dictionary; is always passed to a task setup function
model_name (str): The name of the model
model_executable (str): path to the binary to execute
uni_cfg (dict): the configuration to create a yml file from
which is then needed by the model
uni_basename (str): Basename of the universe to use for folder
creation, i.e.: zero-padded universe number, e.g. uni0042
Returns:
dict: kwargs for the process to be run when task is grabbed by
Worker.
"""
# Generate paths
uni_dir = os.path.join(self.dirs["data"], uni_basename)
uni_cfg_path = os.path.join(uni_dir, "config.yml")
# Create universe directory path using the basename
self._setup_universe_dir(uni_dir, uni_basename=uni_basename)
uni_cfg = self._setup_universe_config(
uni_cfg=uni_cfg, uni_dir=uni_dir, uni_cfg_path=uni_cfg_path
)
wk = self._setup_universe_worker_kwargs(
model_executable=model_executable,
uni_dir=uni_dir,
uni_cfg_path=uni_cfg_path,
uni_cfg=uni_cfg,
**worker_kwargs,
)
return wk
[docs] def _setup_universe_dir(self, uni_dir: str, *, uni_basename: str) -> None:
"""Determines the universe directory and, if needed, creates it.
This is invoked from :py:meth:`~._setup_universe` and is carried out
directly before work on that universe starts.
Args:
uni_basename (str): The basename of the universe to create the run
directory for.
"""
os.mkdir(uni_dir)
log.debug("Created universe directory:\n %s", uni_dir)
[docs] def _setup_universe_config(
self,
*,
uni_cfg: dict,
uni_dir: str,
uni_cfg_path: str,
) -> dict:
"""Sets up the universe configuration and writes it to a file.
This is invoked from :py:meth:`~._setup_universe` and is carried out
directly before work on that universe starts.
Args:
uni_cfg (dict): The given universe configuration
uni_dir (str): The universe directory, added to the configuration
uni_cfg_path (str): Where to store the uni configuration at
Returns:
dict: The (potentially updated) universe configuration
"""
# Store output directory and determine a path to the output hdf5 file
uni_cfg["output_dir"] = uni_dir
uni_cfg["output_path"] = os.path.join(uni_dir, "data.h5")
# Parse the potentially string-valued number of steps values, and
# other step-like arguments. Raises an error if they are negative.
uni_cfg["num_steps"] = parse_num_steps(uni_cfg["num_steps"])
uni_cfg["write_every"] = parse_num_steps(uni_cfg["write_every"])
uni_cfg["write_start"] = parse_num_steps(uni_cfg["write_start"])
# Write the universe config to file
write_yml(uni_cfg, path=uni_cfg_path)
return uni_cfg
[docs] def _setup_universe_worker_kwargs(
self,
*,
model_executable: str,
uni_cfg_path: str,
uni_cfg: dict,
uni_dir: str,
save_streams: bool = False,
**worker_kwargs,
) -> dict:
"""Assembles worker kwargs for a specific universe.
This is invoked from :py:meth:`~._setup_universe` and is carried out
directly before work on that universe starts.
Returns:
dict: the combined worker kwargs, including ``args`` for running
the model executable.
"""
# Build args tuple for task assignment; only need to pass the path
# to the configuration file
args = (model_executable, uni_cfg_path)
# Assemble the worker_kwargs dict
wk = dict(
args=args,
read_stdout=True,
stdout_parser="yaml_dict",
save_streams=save_streams,
**worker_kwargs,
)
# Determine where to save the streams to, if enabled
if save_streams:
wk["save_streams_to"] = os.path.join(uni_dir, "{name:}.log")
return wk
[docs] def _get_TaskCls(self, *, perform_task: bool = True, **_) -> type:
if perform_task:
return WorkerTask
return NoWorkTask
[docs] def _add_sim_task(
self, *, uni_id_str: str, uni_cfg: dict, is_sweep: bool
) -> None:
"""Helper function that handles task assignment to the WorkerManager.
This function creates a WorkerTask that will perform the following
actions **once it is grabbed and worked at**:
- Create a universe (folder) for the task (simulation)
- Write that universe's configuration to a yaml file in that folder
- Create the correct arguments for the call to the model binary
To that end, this method gathers all necessary arguments and registers
a WorkerTask with the WorkerManager.
Args:
uni_id_str (str): The zero-padded uni id string
uni_cfg (dict): given by ParamSpace. Defines how many simulations
should be started
is_sweep (bool): Flag is needed to distinguish between sweeps
and single simulations. With this information, the forwarding
of a simulation's output stream can be controlled.
Raises:
RuntimeError: If adding the simulation task failed
"""
# Generate the universe basename, which will be used for the folder
# and the task name
uni_basename = f"uni{uni_id_str}"
# Create the dict that will be passed as arguments to setup_universe
setup_kwargs = dict(
model_name=self.model_name,
model_executable=self.model_executable,
uni_cfg=uni_cfg,
uni_basename=uni_basename,
)
# Process worker_kwargs
wk = self.meta_cfg.get("worker_kwargs")
if wk and wk.get("forward_streams") == "in_single_run":
# Reverse the flag to determine whether to forward streams
wk["forward_streams"] = not is_sweep
wk["forward_kwargs"] = dict(forward_raw=True)
# Try to add a task to the worker manager
TaskCls = self._get_TaskCls(**wk)
try:
self.wm.add_task(
TaskCls=TaskCls,
name=uni_basename,
priority=None,
setup_func=self._setup_universe,
setup_kwargs=setup_kwargs,
worker_kwargs=wk,
)
except RuntimeError as err:
# Task list was locked, probably because there already was a run
raise RuntimeError(
f"Could not add simulation task for universe '{uni_basename}'! "
"Did you already perform a run with this Multiverse?"
) from err
log.debug("Added simulation task: %s.", uni_basename)
[docs] def _add_sim_tasks(self, *, sweep: bool = None) -> int:
"""Adds the simulation tasks needed for a single run or for a sweep.
Args:
sweep (bool, optional): Whether tasks for a parameter sweep should
be added or only for a single universe. If None, will read the
``perform_sweep`` key from the meta-configuration.
Returns:
int: The number of added tasks.
Raises:
ValueError: On ``sweep == True`` and zero-volume parameter space.
"""
if sweep is None:
sweep = self.meta_cfg.get("perform_sweep", False)
pspace = self.meta_cfg["parameter_space"]
if not sweep:
# Only need the default state of the parameter space
uni_cfg = pspace.default
# Make a backup of the parameter space that is *actually* used
self._perform_pspace_backup(
psp.ParamSpace(uni_cfg),
filename="parameter_space",
perform_sweep=False,
)
# Add the task to the worker manager.
log.progress("Adding task for simulation of a single universe ...")
self._add_sim_task(uni_id_str="0", uni_cfg=uni_cfg, is_sweep=False)
return 1
# -- else: tasks for parameter sweep needed
if pspace.volume < 1:
raise ValueError(
"The parameter space has no sweeps configured! "
"Refusing to run a sweep. You can either call "
"the run_single method or add sweeps to your "
"run configuration using the !sweep YAML tags."
)
# Get the parameter space iterator and the number of already-existing
# tasks (to later compute the number of _added_ tasks)
psp_iter = pspace.iterator(with_info="state_no_str")
_num_tasks = len(self.wm.tasks)
# Distinguish whether to do a regular sweep or we are in cluster mode
if not self.cluster_mode:
# Make a backup of the parameter space that is *actually* used
self._perform_pspace_backup(
pspace, filename="parameter_space", perform_sweep=True
)
# Do a sweep over the whole activated parameter space
vol = pspace.volume
log.progress(
"Adding tasks for simulation of %d universes ...", vol
)
for i, (uni_cfg, uni_id_str) in enumerate(psp_iter):
self._add_sim_task(
uni_id_str=uni_id_str, uni_cfg=uni_cfg, is_sweep=True
)
print(
f" Added simulation task: {uni_id_str} ({i+1}/{vol})",
end="\r",
)
else:
# Prepare a cluster mode sweep
log.info("Preparing cluster mode sweep ...")
# Get the resolved cluster parameters
# These include the following values:
# num_nodes: The total number of nodes to simulate on. This
# is what determines the modulo value.
# node_index: Equivalent to the modulo offset, which depends
# on the position of this Multiverse's node in the
# sequence of all nodes.
rcps = self.resolved_cluster_params
num_nodes = rcps["num_nodes"]
node_index = rcps["node_index"]
# Back up the actually-used parameter space. Do this only on the
# first node to avoid file-writing conflicts between nodes
if node_index == 0:
self._perform_pspace_backup(
pspace, filename="parameter_space", perform_sweep=True
)
# Inform about the number of universes to be simulated
log.progress(
"Adding tasks for cluster-mode simulation of "
"%d universes on this node (%d of %d) ...",
(
pspace.volume // num_nodes
+ (pspace.volume % num_nodes > node_index)
),
node_index + 1,
num_nodes,
)
for i, (uni_cfg, uni_id_str) in enumerate(psp_iter):
# Skip if this node is not responsible
if (i - node_index) % num_nodes != 0:
log.debug("Skipping: %s", uni_id_str)
continue
# Is valid for this node, add the simulation task
self._add_sim_task(
uni_id_str=uni_id_str, uni_cfg=uni_cfg, is_sweep=True
)
num_new_tasks = len(self.wm.tasks) - _num_tasks
log.info("Added %d tasks.", num_new_tasks)
return num_new_tasks
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]class FrozenMultiverse(Multiverse):
"""A frozen Multiverse is like a Multiverse, but frozen.
It is initialized from a finished :py:class:`~utopya.multiverse.Multiverse`
run and re-creates all the attributes from that data, e.g.: the meta
configuration, a DataManager, and a PlotManager.
.. note::
A frozen multiverse is no longer able to perform any simulations.
"""
[docs] def __init__(
self,
*,
model_name: str = None,
info_bundle: ModelInfoBundle = None,
run_dir: str = None,
run_cfg_path: str = None,
user_cfg_path: str = None,
use_meta_cfg_from_run_dir: bool = False,
**update_meta_cfg,
):
"""Initializes the FrozenMultiverse from a model name and the name
of a run directory.
Note that this also takes arguments to specify the run configuration to
use.
Args:
model_name (str): The name of the model to load. From this, the
model output directory is determined and the run_dir will be
seen as relative to that directory.
info_bundle (ModelInfoBundle, optional): The model information
bundle that includes information about the binary path etc.
If not given, will attempt to read it from the model registry.
run_dir (str, optional): The run directory to load. Can be a path
relative to the current working directory, an absolute path,
or the timestamp of the run directory. If not given, will use
the most recent timestamp.
run_cfg_path (str, optional): The path to the run configuration.
user_cfg_path (str, optional): If given, this is used to update the
base configuration. If None, will look for it in the default
path, see Multiverse.USER_CFG_SEARCH_PATH.
use_meta_cfg_from_run_dir (bool, optional): If True, will load the
meta configuration from the given run directory; only works for
absolute run directories.
**update_meta_cfg: Can be used to update the meta configuration
generated from the previous configuration levels
"""
# First things first: get the info bundle
self._info_bundle = get_info_bundle(
model_name=model_name, info_bundle=info_bundle
)
log.progress(
"Initializing FrozenMultiverse for '%s' model ...", self.model_name
)
# Initialize property-managed attributes
self._meta_cfg = None
self._dirs = dict()
self._resolved_cluster_params = None
# Decide whether to load the meta configuration from the given run
# directory or the currently available one.
if (
use_meta_cfg_from_run_dir
and isinstance(run_dir, str)
and os.path.isabs(run_dir)
):
raise NotImplementedError("use_meta_cfg_from_run_dir")
# Find the meta config backup file and load it
# Alternatively, create it from the singular backup files ...
# log.info("Trying to load meta configuration from given absolute "
# "run directory ...")
# Update it with the given update_meta_cfg dict
else:
# Need to create a meta configuration from the currently available
# values.
mcfg, _ = self._create_meta_cfg(
run_cfg_path=run_cfg_path,
user_cfg_path=user_cfg_path,
update_meta_cfg=update_meta_cfg,
)
# Only keep selected entries from the meta configuration. The rest is
# not needed and is deleted in order to not confuse the user with
# potentially varying versions of the meta config.
self._meta_cfg = {
k: v
for k, v in mcfg.items()
if k
in (
"debug_level",
"paths",
"data_manager",
"plot_manager",
"cluster_mode",
"cluster_params",
)
}
log.info("Built meta configuration.")
log.remark(" Debug level: %d", self.debug_level)
self._apply_debug_level()
# Need to make some DataManager adjustments; do so via update dicts
dm_cluster_kwargs = dict()
if self.cluster_mode:
log.note("Cluster mode enabled.")
self._resolved_cluster_params = self._resolve_cluster_params()
rcps = self.resolved_cluster_params # creates a deep copy
log.note(
"This is node %d of %d.",
rcps["node_index"] + 1,
rcps["num_nodes"],
)
# Changes to the meta configuration
# To avoid config file collisions in the PlotManager:
self._meta_cfg["plot_manager"]["cfg_exists_action"] = "skip"
# _Additional_ arguments to pass to DataManager.__init__ below
timestamp = rcps["timestamp"]
dm_cluster_kwargs = dict(
out_dir_kwargs=dict(timestamp=timestamp, exist_ok=True)
)
# Generate the path to the run directory that is to be loaded
self._get_run_dir(**self.meta_cfg["paths"], run_dir=run_dir)
log.note("Run directory:\n %s", self.dirs["run"])
# Create a data manager
self._dm = DataManager(
self.dirs["run"],
name=f"{self.model_name}_data",
**self.meta_cfg["data_manager"],
**dm_cluster_kwargs,
)
log.progress("Initialized DataManager.")
# Instantiate the PlotManager via the helper method
self._pm = self._setup_pm()
log.progress("Initialized FrozenMultiverse.\n")
[docs] def _create_run_dir(self, *_, **__):
"""Overload of parent method, for safety: we should not create a new
run directory."""
raise AttributeError(
f"`_create_run_dir` method should not be called from {type(self)}"
)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]class DistributedMultiverse(FrozenMultiverse):
"""A distributed Multiverse is like a Multiverse, but initialized from an
existing meta-configuration.
It re-creates the WorkerManager attributes from that configuration and
is not allowed to change the meta-configuration that was loaded.
"""
[docs] def __init__(
self,
*,
run_dir: str,
model_name: str = None,
info_bundle: ModelInfoBundle = None,
):
"""Initializes a DistributedMultiverse from a model name and the name
of an existing run directory.
Args:
run_dir (str, optional): The run directory to load. Can be a path
relative to the current working directory, an absolute path,
or the timestamp of the run directory. If not given, will use
the most recent timestamp.
model_name (str): The name of the model to load. From this, the
model output directory is determined and the run_dir will be
seen as relative to that directory.
info_bundle (ModelInfoBundle, optional): The model information
bundle that includes information about the binary path etc.
If not given, will attempt to read it from the model registry.
"""
# First things first: get the info bundle
if info_bundle is None:
info_bundle = get_info_bundle(
model_name=model_name, info_bundle=info_bundle
)
self._info_bundle = info_bundle
log.progress(
"Initializing DistributedMultiverse for '%s' model ...",
self.model_name,
)
# Initialize property-managed attributes
self._dirs = dict()
self._model_executable = None
self._tmpdir = None
self._clear_existing_output = False
# Generate the path to the run directory that is to be loaded
# TODO Extract out dir from available infos
self._get_run_dir(out_dir=None, run_dir=run_dir)
log.note("Run directory:\n %s", self.dirs["run"])
# Load the meta config
meta_cfg_path = os.path.join(run_dir, "config", "meta_cfg.yml")
if not os.path.isfile(meta_cfg_path):
raise ValueError(
"No meta config found in run dir {} at {}!",
run_dir,
meta_cfg_path,
)
log.debug("Loading from meta config at %s ...", meta_cfg_path)
# Only keep selected entries from the meta configuration. The rest is
# not needed and is deleted in order to not confuse the user with
# potentially varying versions of the meta config.
mcfg = load_yml(meta_cfg_path)
self._meta_cfg = {
k: v
for k, v in mcfg.items()
if k
not in (
"data_manager",
"plot_manager",
"cluster_mode",
"cluster_params",
)
}
self._meta_cfg["cluster_mode"] = False
log.info("Restored meta-configuration.")
log.remark(" Debug level: %d", self.debug_level)
self._apply_debug_level()
# Prepare the executable
self._prepare_executable(**self.meta_cfg["executable_control"])
self._wm = WorkerManager(**self.meta_cfg["worker_manager"])
self._reporter = WorkerManagerReporter(
self.wm,
mv=self,
report_dir=self.dirs["run"],
**self.meta_cfg["reporter"],
)
log.progress("Initialized DistributedMultiverse.\n")
[docs] def run_selection(
self,
*,
uni_id_strs: List[str],
num_workers=None,
clear_existing_output: bool = False,
):
"""Starts a simulation run for specified universes.
Specifically, this method adds simulation tasks to the associated
WorkerManager, locks its task list, and then invokes the
:py:meth:`~utopya.workermanager.WorkerManager.start_working` method
which performs all the simulation tasks.
.. note::
As this method locks the task list of the
:py:class:`~utopya.workermanager.WorkerManager`, no further tasks
can be added henceforth. This means, that each Multiverse instance
can only perform a single simulation run.
Args:
uni_id_strs(List[str]): The list of universe ID strings to run,
e.g. '00154'. Note that leading zeros are required.
num_workers (int, optional): Specify the number of workers
to use, overwriting the setting from the meta-configuration.
clear_existing_output (bool, optional): Whether to remove
existing files from the universe outpout directories.
Only the configuration file will not be deleted.
"""
log.info(
"Preparing for simulation run of %d universe(s) (%s) ...",
len(uni_id_strs),
uni_id_strs,
)
# Store parameter, used during universe dir setup
self._clear_existing_output = clear_existing_output
# Generate all possible parameter space combinations
pspace = self.meta_cfg["parameter_space"]
psp_iter = pspace.iterator(with_info="state_no_str")
uni_cfgs = dict()
for uni_cfg, uni_id_str in psp_iter:
uni_cfgs[uni_id_str] = uni_cfg
# Now, add the respective tasks, if the selection matches
is_sweep = len(uni_id_strs) > 1
for i, uni_id_str in enumerate(uni_id_strs):
if uni_id_str.startswith("uni"):
uni_id_str = uni_id_str[3:]
uni_cfg = uni_cfgs.get(uni_id_str, None)
if uni_cfg is None:
raise ValueError(f"No universe '{uni_id_str}' found!")
self._add_sim_task(
uni_id_str=uni_id_str,
uni_cfg=uni_cfg,
is_sweep=is_sweep,
)
self.wm.tasks.lock()
if num_workers is not None:
self.wm.num_workers = num_workers
# Tell the WorkerManager to start working (is a blocking call)
self.wm.start_working(**self.meta_cfg["run_kwargs"])
# Done! :)
log.success("Finished simulation run. Wohoo. :)\n")
[docs] def run(
self,
*,
num_workers=None,
clear_existing_output: bool = False,
skip_existing_output: bool = False,
):
"""Starts a simulation run for all universes.
Overload of parent method that allows for universes to be skipped if
output already exists from a previous run.
Args:
num_workers (int, optional): Specify the number of workers
available.
clear_existing_output (bool, optional): Whether to remove
files in the output directory of the universes other than the
configuration file.
skip_existing_output (bool, optional): Whether to skip
universes if output already exists.
"""
ALLOWED_FILES = ("config.yml",) # make sure these are sorted
log.info("Preparing for repeated simulation run ...")
# Store parameter, used during universe dir setup
self._clear_existing_output = clear_existing_output
# Generate all possible parameter space combinations
pspace = self.meta_cfg["parameter_space"]
psp_iter = pspace.iterator(with_info="state_no_str")
# Gather uni configs
# TODO Rework this: skip-existing decision should be taken later!
uni_cfgs = dict()
for uni_cfg, uni_id_str in psp_iter:
if skip_existing_output:
output_exists = False
uni_basename = f"uni{uni_id_str}"
uni_dir = os.path.join(self.dirs["data"], uni_basename)
# Check existence of path and its content
if os.path.isdir(uni_dir):
for file in os.listdir(uni_dir):
if file not in ALLOWED_FILES:
output_exists = True
break
# only add uni to tasks if no output exists
if not output_exists:
uni_cfgs[uni_id_str] = uni_cfg
else:
uni_cfgs[uni_id_str] = uni_cfg
log.info(" Found %d universe(s) to work on.", len(uni_cfgs))
# Now, add the respective tasks, if the selection matches
is_sweep = len(uni_cfgs) > 1
for uni_id_str, uni_cfg in uni_cfgs.items():
self._add_sim_task(
uni_id_str=uni_id_str,
uni_cfg=uni_cfg,
is_sweep=is_sweep,
)
self.wm.tasks.lock()
if num_workers is not None:
self.wm.num_workers = num_workers
# Tell the WorkerManager to start working (is a blocking call)
self.wm.start_working(**self.meta_cfg["run_kwargs"])
# Done! :)
log.success("Finished simulation run. Wohoo. :)\n")
[docs] def _prepare_executable(self, *args, **kwargs) -> None:
if self.meta_cfg["backups"]["backup_executable"]:
execpath = os.path.join(
self.dirs["run"], "backup", self.model_name
)
if not os.path.isfile(execpath):
raise FileNotFoundError(f"No executable found at {execpath}!")
log.remark("Restored executable at:\n %s", execpath)
self._model_executable = execpath
return super()._prepare_executable(*args, **kwargs)
# .. Overloads for universe setup .........................................
[docs] def _get_TaskCls(self, **_) -> type:
return WorkerTask
[docs] def _setup_universe_dir(self, uni_dir: str, *, uni_basename: str):
"""Overload of parent method that allows for universe directories to
already exist."""
ALLOWED_FILES = ("config.yml",) # make sure these are sorted
if not os.path.isdir(uni_dir):
# Easy, set up from scratch:
super()._setup_universe_dir(
uni_dir=uni_dir, uni_basename=uni_basename
)
return
# else: already exists.
# Allow to remove existing output from the directory
if self._clear_existing_output and len(os.listdir(uni_dir)) > 1:
for file in os.listdir(uni_dir):
if file in ALLOWED_FILES:
continue
os.remove(os.path.join(uni_dir, file))
# Check that the universe directory is empty
existing_files = os.listdir(uni_dir)
if existing_files and sorted(existing_files) != list(ALLOWED_FILES):
raise RuntimeError(
f"Output directory of universe '{uni_basename}' contains "
f"files other than ({', '.join(ALLOWED_FILES)}). "
"Did you already perform a run on this universe? "
f"Found the following files: {', '.join(existing_files)}"
)
[docs] def _setup_universe_config(self, *, uni_cfg_path: str, **kwargs) -> dict:
"""Overload of parent method that checks if a universe config already
exists and, if so, loads that one instead of storing a new one.
"""
if os.path.isfile(uni_cfg_path):
# Can restore it
log.debug("Restoring universe config from:\n %s.", uni_cfg_path)
uni_cfg = load_yml(uni_cfg_path)
else:
# Need to create it
uni_cfg = super()._setup_universe_config(
uni_cfg_path=uni_cfg_path, **kwargs
)
return uni_cfg