Source code for utopya._yaml_registry.registry

"""Implements a YAML-based registry infrastructure"""

import copy
import logging
import os

import dantro.utils

from ..exceptions import (
from import make_columns, pformat, recursive_update
from .entry import RegistryEntry

log = logging.getLogger(__name__)

# -----------------------------------------------------------------------------

[docs]class KeyOrderedDict(dantro.utils.KeyOrderedDict): """A key-ordered dict that expects string keys and sorts by the lower-case representation of keys. """ DEFAULT_KEY_COMPARATOR = lambda _, k: k.lower()
# -----------------------------------------------------------------------------
[docs]class YAMLRegistry: """A registry framework that persistently stores the registry entries as YAML files within a common directory. Individual registry entries can be retrieved via a dict-like interface. """
[docs] def __init__(self, EntryCls: type, *, registry_dir: str): """Set up a registry directory for a certain class of registry entries. Args: EntryCls (type): Type of the individual entries registry_dir (str): Path to the directory in which the individual registry entry files are to be stored. """ if not issubclass(EntryCls, RegistryEntry): raise TypeError( f"EntryCls needs to be a subclass of {RegistryEntry}, " f"but was {EntryCls}!" ) self._registry_dir = registry_dir self._EntryCls = EntryCls self._registry = KeyOrderedDict() self._load_errors = dict() self.reload()
@property def registry_dir(self) -> str: """The associated registry directory""" return self._registry_dir def __str__(self) -> str: return "<{}, entry type: {}, @ {} >".format( type(self).__name__, self._EntryCls.__name__, self.registry_dir )
[docs] def reload(self): """Load all available entries from the registry directory. If called multiple times, will only load entries that are not already loaded. """ log.debug("Disassociating existing entries ...") for entry in self.values(): entry._registry = None self._registry = KeyOrderedDict() log.debug("Re-loading entries from registry directory ...") new_entries = [] for fname in os.listdir(self.registry_dir): name, ext = os.path.splitext(fname) if name in self or ext != self._EntryCls.FILE_EXTENSION: continue # Try to load it try: entry = self._EntryCls(name=name, registry=self) except Exception as exc: self._load_errors[name] = exc else: self._registry[] = entry new_entries.append(name) # Inform about errors if self._load_errors: err_info = "\n\n".join( f"- {name}: {exc}" for name, exc in self._load_errors.items() ) log.error( "There were errors during loading of %d project(s):\n\n%s\n", len(self._load_errors), err_info, ) log.caution( "These missing projects may cause errors later on; it is " "best to address them, e.g. by editing the project registry " "or by removing and re-registering a project.\n" ) log.debug( "Loaded %s entr%s: %s", len(new_entries), "ies" if len(new_entries) != 1 else "y", ", ".join(new_entries), )
# Dict interface ..........................................................
[docs] def keys(self): return self._registry.keys()
[docs] def values(self): return self._registry.values()
[docs] def items(self): return self._registry.items()
def __iter__(self): return self._registry.__iter__()
[docs] def __contains__(self, name: str) -> bool: """Whether an entry of the given name exists in the registry""" return name in self._registry
def __len__(self) -> int: return len(self._registry) # Working on entries ......................................................
[docs] def __getitem__(self, name: str): """Retrieve a deep copy of a model registration entry for the given model name. """ try: return self._registry[name] except KeyError as err: raise MissingEntryError( f"{self._EntryCls.__name__} '{name}' not found in {self}! " f"Available entries:\n{make_columns(self.keys())}" ) from err
[docs] def __delitem__(self, name: str): """Removes a registry entry""" self.remove_entry(name)
# Adding and removing .....................................................
[docs] def add_entry( self, name: str, *, exists_action: str = "raise", **data ) -> RegistryEntry: """Creates a new entry and stores it in the registry. If an entry of the same name already exists, allows according to the ``exists_action`` Adds a new entry of a certain name; raises if it already exists. TODO Write Args: name (str): Description exists_action (str, optional): Description **data: Description Returns: RegistryEntry: Description Raises: EntryExistsError: Description ValidationError: ValueError: """ # Construct the entry itself, but *without* associating it with the # registry -- this allows to evaluate the exists action: new_entry = self._EntryCls(name=name, registry=None, **data) if name in self: log.caution("An entry named '%s' already exists!", name) if exists_action == "raise": raise EntryExistsError( f"An entry '{name}' in {self} already exists! " "Either remove it or choose a different `exists_action`." ) elif exists_action == "validate": log.remark("Validating new entry against existing entry ...") if self[name] != new_entry: # Generate a diff such that its clearer where they differ import difflib import json natify = lambda d: json.loads(d.model_dump_json()) diff = "\n".join( difflib.Differ().compare( pformat(natify(self[name])).split("\n"), pformat(natify(new_entry)).split("\n"), ) ) raise EntryValidationError( f"Validation of project '{name}' failed!\n" "The to-be-added project information did not compare " "equal to the already existing one for that project.\n" "Either change the `exists_action` argument to " "'overwrite' or 'update' or make sure the information " f"is equal.\nTheir YAML diff is as follows:\n\n{diff}" ) # else: no need to change anything below log.remark("Validation of entry '%s' succeeded.", name) return elif exists_action == "update": log.remark("Updating existing entry with new entry ...") data = recursive_update(self[name].dict(), copy.deepcopy(data)) new_entry = self._EntryCls(name=name, registry=None, **data) elif exists_action == "overwrite": log.remark("Overwriting already existing entry ...") pass elif exists_action == "skip": log.remark("Not adding the new entry.") return else: raise ValueError( f"Invalid `exists_action` '{exists_action}'!\n" "Possible values: raise, validate, update, overwrite, skip" ) # Now, make the registry association, store it here and write the file new_entry._set_registry(self) new_entry.write() self._registry[] = new_entry log.success("Added entry '%s'.", return new_entry
[docs] def remove_entry(self, name: str): """Removes a registry entry and deletes the associated registry file. Args: name (str): The name of the entry that is to be removed Raises: ValueError: On invalid (non-existing) model """ try: entry = self._registry.pop(name) except KeyError as err: if name not in self._load_errors: raise MissingEntryError( f"Could not remove registry entry '{name}', because " "no such entry is present.\nAvailable entries:\n" f"{make_columns(self.keys())}" ) from err # Don't have entry, but can still remove the file log.caution( "Removing (potentially corrupt) registry file " "for entry '%s' ...", name, ) fpath = os.path.join( self._registry_dir, f"{name}{self._EntryCls.FILE_EXTENSION}" ) os.remove(fpath) log.debug("Removed registry file: %s", fpath) return else: log.debug("Removed entry '%s' from %s.", name, self) entry.remove_registry_file() log.debug( "Removed associated registry file: %s", entry.registry_file_path ) entry._registry = None
# Entry goes out of scope now and is then be garbage-collected if it # does not exist anywhere else... Only if some action is taken on that # entry does it lead to file being created again.