Source code for utopya.model_registry.info_bundle

"""Implements the ModelInfoBundle class, which holds a single bundle of
information about a model.
"""

import copy
import logging
import os
import time
from typing import Optional, Sequence, Tuple, Union

from ..exceptions import MissingProjectError
from ..tools import load_selected_keys, load_yml, pformat, recursive_update

log = logging.getLogger(__name__)

TIME_FSTR = "%y%m%d-%H%M%S"
"""Format string to use for generating readable time stamps"""

# -----------------------------------------------------------------------------


[docs]class ModelInfoBundle: """A bundle of model information; behaves like a read-only dict""" SRC_DIR_SEARCH_PATHS = { "model_info": "{model_name:}_info.yml", "default_cfg": "{model_name:}_cfg.yml", "default_plots": "{model_name:}_plots.yml", "base_plots": "{model_name:}_base_plots.yml", } """Path keys that can be searched within the source directory. Values are format strings that are evaluated with additional information being available (``model_name``). The resulting paths are interpreted as paths relative to the source directory. """ FSTR_SUFFIX = "_fstr" """A suffix used to detect format strings in the path parsing routine. If a key ends with such a suffix, the corresponding value is assumed to be a format string and is evaluated. The resulting path is stored, dropping the suffix. """ PATHS_SCHEMA = ( ("executable", str), ("model_info", str), ("source_dir", str), ("default_cfg", str), ("default_plots", str), ("base_plots", str), ("py_tests_dir", str), ("py_plots_dir", str), ("mv_model_cfg", str), ) """Schema to use for a bundle's ``paths`` entry""" METADATA_SCHEMA = ( ("version", str), ("long_name", str), ("description", str), ("long_description", str), ("license", str), ("author", str), ("email", str), ("website", str), ("utopya_compatibility", str), ("language", str), ("requirements", list), ("misc", dict), ) """Schema to use for a bundle's ``metadata`` entry""" # .........................................................................
[docs] def __init__( self, *, model_name: str, paths: dict, metadata: dict = None, project_name: str = None, eval_after_run: bool = None, registration_time: str = None, missing_path_action: str = "log", extract_model_info: bool = False, ): """Initialize a ModelInfoBundle Args: model_name (str): Name of the model this info bundle describes paths (dict): A dictionary of paths metadata (dict, optional): A dictionary of metadata entries project_name (str, optional): The project this model is part of eval_after_run (bool, optional): Whether a model run should be followed by an evaluation. registration_time (str, optional): Timestamp of registration missing_path_action (str, optional): Action upon a path in the ``paths`` dict that does not exist. Can be ``ignore``, ``log``, ``warn``, ``raise``. extract_model_info (bool, optional): Whether to use the information from a potentially existing file at the ``model_info`` path to pre-populate this bundle. Any explicitly given arguments take precedence over the information from that file. """ self._model_name = model_name self._reg_time = ( registration_time if registration_time is not None else time.strftime(TIME_FSTR) ) # Parse paths that were passed as argument here paths = self._parse_paths( **{k: os.path.expanduser(p) for k, p in paths.items() if p}, missing_path_action=missing_path_action, ) # If a model info file is known at this point, use it to pre-populate # the data dict. self._d = dict( paths=dict(), metadata=dict(), project_name=project_name, eval_after_run=eval_after_run, ) if extract_model_info and paths.get("model_info"): self._d = recursive_update( self._d, self._load_and_parse_model_info(paths["model_info"]), ) # Now populate the data dict with the explicitly passed arguments, # which should take precedence over the information from the model info err_msg_fstr = "Failed loading {} info for '{}' model info bundle!" load_selected_keys( paths, add_to=self.paths, keys=self.PATHS_SCHEMA, err_msg_prefix=err_msg_fstr.format("paths", model_name), ) load_selected_keys( (metadata if metadata is not None else {}), add_to=self.metadata, keys=self.METADATA_SCHEMA, err_msg_prefix=err_msg_fstr.format("metadata", model_name), ) log.debug( "Created configuration bundle for model '%s'.", self.model_name )
# Magic methods ...........................................................
[docs] def __eq__(self, other) -> bool: """Compares equality by comparing the stored configuration. Only if another ModelInfoBundle is compared against does the model name also take part in the comparison. """ if isinstance(other, ModelInfoBundle): return self._d == other._d and self._model_name == other.model_name return self._d == other
# Formatting .............................................................. def __str__(self) -> str: return "<Info bundle for '{}' model>\n{}\n".format( self._model_name, pformat(self._d) ) # General Access .......................................................... @property def model_name(self) -> str: return self._model_name @property def registration_time(self) -> str: """Registration time string of this bundle""" return self._reg_time @property def as_dict(self) -> dict: """Returns a deep copy of all bundle data. This does NOT include the model name and the registration time.""" return copy.deepcopy(self._d)
[docs] def __getitem__(self, key: str): """Direct access to the full bundle data""" return self._d[key]
@property def executable(self) -> Optional[str]: """The path to the model executable""" return self._d["paths"].get("executable") @property def paths(self) -> dict: """Access to the paths information of the bundle""" return self._d["paths"] @property def metadata(self) -> dict: """Access to the metadata information of the bundle""" return self._d["metadata"] @property def project_name(self) -> str: """Access to the Utopia project name information of the bundle""" return self._d["project_name"] @property def eval_after_run(self) -> Union[bool, None]: """Whether a model run should be followed by the evaluation routine.""" return self._d["eval_after_run"] @property def project(self) -> Union[None, dict]: """Load the project information corresponding to this project. Will be None if no project is associated. """ if not self.project_name: return None from .. import PROJECTS return PROJECTS[self.project_name] @property def missing_paths(self) -> dict: """Returns those paths where os.path.exists did not evaluate to True""" return {k: p for k, p in self.paths.items() if not os.path.exists(p)} # Helpers .................................................................
[docs] def _load_and_parse_model_info(self, path: str) -> dict: """Loads the model info file from the given path and parses it. Parsing steps: - Remove entries ``model_name`` or ``label`` that are not relevant at this point. Args: path (str): Path to the model info YAML file """ d = load_yml(path) for key in ("model_name", "label"): d.pop(key, None) return d
[docs] def _parse_paths( self, *, missing_path_action: str, executable: str = None, source_dir: str = None, base_executable_dir: str = None, base_source_dir: str = None, model_info: str = None, **more_paths, ) -> dict: """Given the path arguments, parse them into absolute paths. There are the following parsing steps: 1. Evaluate ``executable`` and ``source_dir``, potentially using the ``base_*`` arguments to resolve relative paths. 2. If a ``model_info`` path is given, the directory of that file is used for any *missing* ``base_*`` argument. 3. If a ``source_dir`` was given, that directory is searched for further existing paths using ``SRC_DIR_SEARCH_PATHS``. 4. Empty entries in ``more_paths`` are skipped 5. Remaining entries in ``more_paths`` that end with the suffix specified by the ``FSTR_SUFFIX`` class attribute are interpreted as format strings and resolved by providing information on the ``model_name`` and all available ``paths``. Relative paths are interpreted as relative to the source directory. 6. For all paths, it is checked whether they exist. The ``missing_path_action`` argument determines what to do if not. """ paths = dict() # Make sure the base_* are not None (makes os.path.join call easier) base_executable_dir = ( base_executable_dir if base_executable_dir is not None else "" ) base_source_dir = ( base_source_dir if base_source_dir is not None else "" ) # If a model info file is given, can use its directory as a base # directory to determine the missing base_* arguments model_info_dir = "" if model_info: if not os.path.isabs(model_info): raise ValueError( "The path to the model info file needs to be absolute, " f"but was: {model_info}" ) model_info_dir = os.path.dirname(model_info) if not base_source_dir: base_source_dir = model_info_dir if not base_executable_dir: base_executable_dir = model_info_dir paths["model_info"] = model_info # Evaluate the executable file path if executable is not None: paths["executable"] = os.path.join(base_executable_dir, executable) # Prepare an absolute version of the source directory path abs_source_dir_path = None if source_dir: abs_source_dir_path = os.path.realpath( os.path.join(base_source_dir, source_dir) ) # If a source directory is given, store it, then auto-detect some files if abs_source_dir_path: paths["source_dir"] = abs_source_dir_path for key, fname_fstr in self.SRC_DIR_SEARCH_PATHS.items(): # Build the full file path, making the model name available. # If that path points to an existing file or directory, add it. fname = fname_fstr.format(model_name=self.model_name) fpath = os.path.join(abs_source_dir_path, fname) if os.path.exists(fpath): paths[key] = fpath # Parse remaining path entries for key, path in more_paths.items(): if path is None: continue # Evaluate potentially existing format string style paths, adding # them to the paths dict *without* the suffix that was used to # identify them. if key.endswith(self.FSTR_SUFFIX): subkey = key[: -len(self.FSTR_SUFFIX)] path = path.format(model_name=self.model_name, paths=paths) if subkey in paths or not os.path.exists(path): continue # Path exists and was not yet added; store it under the subkey key = subkey # Interpret relative paths as relative to the source directory if not os.path.isabs(path): # Is relative. Try to resolve it, starting with a path relative # to the source directory. As a fallback, can still attempt to # interpret it relative to the model info file. if key in self.SRC_DIR_SEARCH_PATHS and abs_source_dir_path: path = os.path.join(abs_source_dir_path, path) elif model_info_dir: path = os.path.join(model_info_dir, path) else: raise ValueError( f"Given `{key}` path ({path}) was relative, but " "no source directory was specified relative to which " "this path could be interpreted and no model " "information file was available either!" ) # All good, can now store it. If it was a path that is not to be # seen as relative to the source directory, that will lead to an # error being thrown below ... paths[key] = path # Done populating paths dict. # Make sure paths are all absolute and exist. for key, path in paths.items(): if not os.path.isabs(path): raise ValueError( f"The given `{key}` path ({path}) for info bundle " f"of model '{self.model_name}' was not absolute! " "Please provide only absolute paths (may include ~)." ) if not os.path.exists(path): msg = ( f"Given '{key}' path for model '{self.model_name}' does " f"not exist: {path}" ) if missing_path_action == "warn": log.warning(msg) elif missing_path_action in ("log", "ignore"): log.debug(msg) elif missing_path_action == "raise": raise ValueError( msg + "\nEither adjust the corresponding model info " "bundle or place the expected file at that path. " "To ignore this error, pass the `missing_path_action` " "argument to ModelInfoBundle." ) else: raise ValueError( f"Invalid missing_path_action '{missing_path_action}'!" " Choose from: ignore, log, warn, raise." ) return paths
# YAML representation .....................................................
[docs] @classmethod def to_yaml(cls, representer, node): """Creates a YAML representation of the data stored in this bundle. Args: representer (ruamel.yaml.representer): The representer module node (ModelInfoBundle): The node to represent, i.e. an instance of this class. Returns: A YAML representation of the given instance of this class. """ return representer.represent_data(node._d)