utopya package#
The utopya package provides a simulation management and
evaluation framework with the following components:
A registry framework for models (
model_registry) and projects (ProjectRegistry)A configuration manager and simulation runner, the
Multiverse:Contains a multi-level configuration interface
Parallel simulation execution via
WorkerManager.
Coupling to the
dantrodata evaluation pipeline, integrated viautopya.eval:Custom data
groupsandcontainersA
eval.plotmanager.PlotManagerthat takes into account project- or model-specific plot function definitions.
The
model.Modelabstraction which allows convenient interactive work with utopya and registered models.The
testtools.ModelTestclass, containing specializations that make it more convenient to implement model tests using utopya.
Batch simulation running and evaluation via
batch
For a real-world example of how utopya can be integrated, have a look at the
Utopia modelling framework which uses
utopya as its frontend.
For model implementations, the utopya_backend package can assist in
building Python-based models that use utopya as a frontend.
Also visit the user manual front page for more information.
Subpackages#
- utopya._yaml_registry package
- Submodules
- utopya._yaml_registry.entry module
BaseSchemaRegistryEntryRegistryEntry.SCHEMARegistryEntry.FILE_EXTENSIONRegistryEntry._NO_FORWARDING_ATTRSRegistryEntry.__init__()RegistryEntry._parse_data()RegistryEntry._set_registry()RegistryEntry.nameRegistryEntry.has_registryRegistryEntry.registry_dirRegistryEntry.registry_file_pathRegistryEntry.dataRegistryEntry.dict()RegistryEntry.__str__()RegistryEntry.__eq__()RegistryEntry.__getattr__()RegistryEntry.__setattr__()RegistryEntry.__getitem__()RegistryEntry.get()RegistryEntry.load()RegistryEntry.write()RegistryEntry.remove_registry_file()
- utopya._yaml_registry.registry module
- utopya.eval package
- Subpackages
- utopya.eval.plots package
- Submodules
- utopya.eval.plots._attractor module
- utopya.eval.plots._graph module
- utopya.eval.plots._mpl module
- utopya.eval.plots._utils module
- utopya.eval.plots.abm module
- utopya.eval.plots.attractor module
- utopya.eval.plots.ca module
- utopya.eval.plots.distributions module
- utopya.eval.plots.graph module
- utopya.eval.plots.time_series module
- utopya.eval.plots package
- Submodules
- utopya.eval._plot_func_resolver module
- utopya.eval.containers module
NumpyDCXarrayDCXarrayDC.PROXY_RESOLVE_ASTYPEXarrayDC.PROXY_RETAINXarrayDC.PROXY_REINSTATE_FAIL_ACTIONXarrayDC._XRC_DIMS_ATTRXarrayDC._XRC_DIM_NAME_PREFIXXarrayDC._XRC_COORDS_ATTR_PREFIXXarrayDC._XRC_COORDS_MODE_DEFAULTXarrayDC._XRC_COORDS_MODE_ATTR_PREFIXXarrayDC._XRC_INHERIT_CONTAINER_ATTRIBUTESXarrayDC._XRC_STRICT_ATTR_CHECKINGXarrayDC._abc_impl
XarrayYamlDCGridDCGridDC._GDC_grid_shape_attrGridDC._GDC_space_extent_attrGridDC._GDC_index_order_attrGridDC._GDC_grid_structure_attrGridDC.__init__()GridDC._abc_implGridDC._postprocess_proxy_resolution()GridDC._parse_sizes_from_metadata()GridDC.grid_shapeGridDC.space_extentGridDC.shapeGridDC.ndimGridDC._determine_shape()GridDC._reshape_data()
- utopya.eval.data_ops module
- utopya.eval.datamanager module
- utopya.eval.groups module
UniverseGroupMultiverseGroupTimeSeriesGroupHeterogeneousTimeSeriesGroupGraphGroupGraphGroup._NEW_GROUP_CLSGraphGroup._ALLOWED_CONT_TYPESGraphGroup._GG_node_containerGraphGroup._GG_edge_containerGraphGroup._GG_attr_directedGraphGroup._GG_attr_parallelGraphGroup._GG_attr_edge_container_is_transposedGraphGroup._GG_attr_keep_dimGraphGroup._GG_WARN_UPON_BAD_ALIGNGraphGroup._abc_impl
- utopya.eval.plotcreators module
- utopya.eval.plothelper module
- utopya.eval.plotmanager module
- utopya.eval.transform module
- Subpackages
- utopya.model_registry package
MODELS- Submodules
- utopya.model_registry._registration module
- utopya.model_registry.entry module
ModelRegistryEntryModelRegistryEntry.__init__()ModelRegistryEntry.model_nameModelRegistryEntry.registry_dirModelRegistryEntry.registry_file_pathModelRegistryEntry.default_labelModelRegistryEntry.default_bundleModelRegistryEntry.__len__()ModelRegistryEntry.__contains__()ModelRegistryEntry.__eq__()ModelRegistryEntry.__getitem__()ModelRegistryEntry.item()ModelRegistryEntry.keys()ModelRegistryEntry.values()ModelRegistryEntry.items()ModelRegistryEntry.add_bundle()ModelRegistryEntry.pop()ModelRegistryEntry.clear()ModelRegistryEntry.set_default_label()ModelRegistryEntry._load_from_registry()ModelRegistryEntry._update_registry_file()ModelRegistryEntry._as_dict()ModelRegistryEntry.to_yaml()
- utopya.model_registry.info_bundle module
TIME_FSTRModelInfoBundleModelInfoBundle.SRC_DIR_SEARCH_PATHSModelInfoBundle.FSTR_SUFFIXModelInfoBundle.PATHS_SCHEMAModelInfoBundle.METADATA_SCHEMAModelInfoBundle.__init__()ModelInfoBundle.__eq__()ModelInfoBundle.model_nameModelInfoBundle.registration_timeModelInfoBundle.as_dictModelInfoBundle.__getitem__()ModelInfoBundle.executableModelInfoBundle.pathsModelInfoBundle.metadataModelInfoBundle.project_nameModelInfoBundle.eval_after_runModelInfoBundle.projectModelInfoBundle.missing_pathsModelInfoBundle._load_and_parse_model_info()ModelInfoBundle._parse_paths()ModelInfoBundle.to_yaml()
- utopya.model_registry.registry module
KeyOrderedDictModelRegistryModelRegistry.__init__()ModelRegistry.registry_dirModelRegistry.info_strModelRegistry.info_str_detailedModelRegistry.keys()ModelRegistry.values()ModelRegistry.items()ModelRegistry.__contains__()ModelRegistry.__getitem__()ModelRegistry.register_model_info()ModelRegistry.remove_entry()ModelRegistry._add_entry()ModelRegistry._load_from_registry_dir()
- utopya.model_registry.utils module
Submodules#
utopya._cluster module#
This module holds functions used in the Multiverse’s cluster mode
- utopya._cluster.parse_node_list(node_list_str: str, *, mode: str, rcps: dict) List[str][source]#
Parses the node list to a list of node names and checks against the given resolved cluster parameters.
Depending on
mode, different forms of the node list are parsable. Forcondensedmode:node042 node[002,004-011,016] m05s[0204,0402,0504] m05s[0204,0402,0504],m08s[0504,0604,0701],m13s0603,m14s[0501-0502]
utopya._import_tools module#
Helper module that contains tools useful for module imports or manipulation
of the system path.
These are not implemented here but in dantro._import_tools.
Deprecated since version 1.0.1: This module is deprecated. Use dantro._import_tools or
utopya_backend.tools.import_package_from_dir() instead.
utopya._logging module#
Sets up logging, based on dantro’s logging features
- utopya._logging._utopya_LogRecordFactory(*args, **kwargs)[source]#
Custom log record factory.
When building the log record object, adds
shortnamein addition.
- utopya._logging._log = <DantroLogger utopya (REMARK)>#
The utopya root logger
utopya._resources module#
Defines some resources used internally throughout the package
utopya._signal module#
Implements signalling-related functionality and globally relevant data
- utopya._signal.SIGMAP = {'SIGABRT': 6, 'SIGALRM': 14, 'SIGBUS': 7, 'SIGCHLD': 17, 'SIGCLD': 17, 'SIGCONT': 18, 'SIGFPE': 8, 'SIGHUP': 1, 'SIGILL': 4, 'SIGINT': 2, 'SIGIO': 29, 'SIGIOT': 6, 'SIGKILL': 9, 'SIGPIPE': 13, 'SIGPOLL': 29, 'SIGPROF': 27, 'SIGPWR': 30, 'SIGQUIT': 3, 'SIGRTMAX': 64, 'SIGRTMIN': 34, 'SIGSEGV': 11, 'SIGSTOP': 19, 'SIGSYS': 31, 'SIGTERM': 15, 'SIGTRAP': 5, 'SIGTSTP': 20, 'SIGTTIN': 21, 'SIGTTOU': 22, 'SIGURG': 23, 'SIGUSR1': 10, 'SIGUSR2': 12, 'SIGVTALRM': 26, 'SIGWINCH': 28, 'SIGXCPU': 24, 'SIGXFSZ': 25, 'SIG_BLOCK': 0, 'SIG_DFL': 0, 'SIG_IGN': 1, 'SIG_SETMASK': 2, 'SIG_UNBLOCK': 1}#
A map from signal names to corresponding integer exit codes
utopya._yaml module#
Supplies basic YAML interface, inherited from yayaml
utopya.batch module#
Implements batch running and evaluation of simulations
- utopya.batch._BTM_BASE_CFG_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/stable/utopya/cfg/btm_cfg.yml'#
Base configuration path of the batch task manager
- utopya.batch._BTM_BASE_CFG = {'cluster_mode': False, 'debug': False, 'parallelization_level': 'batch', 'paths': {'note': None, 'out_dir': '~/utopya_output/_batch'}, 'reporter': {'report_formats': {'progress_bar': {'info_fstr': '{prg[total]:>5.1f}% ', 'min_report_intv': 0.5, 'num_cols': 'adaptive', 'parser': 'progress_bar', 'show_times': True, 'times_fstr': '| {elapsed:>7s} elapsed | ~{est_left:>7s} left ', 'times_fstr_final': '| finished in {elapsed:} ', 'times_kwargs': {'mode': 'from_buffer', 'progress_buffer_size': 90}, 'write_to': 'stdout_noreturn'}, 'report_file': {'distributed_status_fstr': ' {progress_here:>5s} @ {host_name_short:12s} - {pid:7d}: {status:10s} ({tags})', 'max_num_to_show': 2048, 'min_num': 2, 'min_report_intv': 10, 'parser': 'report', 'show_distributed_run_info': True, 'show_exit_codes': True, 'show_host_info': True, 'show_individual_runtimes': True, 'task_label_plural': 'universes', 'task_label_singular': 'universe', 'write_to': {'file': {'path': '_report.txt', 'skip_if_dmv': False}}}}}, 'run_kwargs': {'timeout': None}, 'task_defaults': {'eval': {'create_symlinks': True, 'data_manager': {'out_dir_kwargs': {'exist_ok': False}}, 'out_dir': '{task_name:}', 'plot_manager': {'raise_exc': True}, 'priority': None}, 'run': {}}, 'tasks': {'eval': {}, 'run': {}}, 'worker_kwargs': {'forward_raw': True, 'forward_streams': False, 'popen_kwargs': {}, 'remove_ansi': True, 'save_raw': True, 'save_streams': True, 'streams_log_lvl': None}, 'worker_manager': {'interrupt_params': {'exit': False, 'grace_period': 5.0, 'send_signal': 'SIGTERM'}, 'lines_per_poll': 50, 'nonzero_exit_handling': 'warn_all', 'num_workers': 'auto', 'periodic_task_callback': 20, 'poll_delay': 0.05, 'rf_spec': {'after_cancel': ['progress_bar', 'report_file'], 'after_fail': ['report_file'], 'after_work': ['progress_bar', 'report_file'], 'before_working': [], 'monitor_updated': ['progress_bar'], 'task_finished': ['progress_bar', 'report_file'], 'task_invoked': [], 'task_skipped': ['progress_bar', 'report_file'], 'task_spawned': ['progress_bar'], 'while_working': ['progress_bar']}, 'save_streams_on': ['periodic_callback'], 'spawn_rate': -1}}#
Actual base configuration of the batch task manager
- utopya.batch._BTM_USER_DEFAULTS = {}#
User defaults for the batch task manager
- utopya.batch._BTM_DEFAULTS = {'cluster_mode': False, 'debug': False, 'parallelization_level': 'batch', 'paths': {'note': None, 'out_dir': '~/utopya_output/_batch'}, 'reporter': {'report_formats': {'progress_bar': {'info_fstr': '{prg[total]:>5.1f}% ', 'min_report_intv': 0.5, 'num_cols': 'adaptive', 'parser': 'progress_bar', 'show_times': True, 'times_fstr': '| {elapsed:>7s} elapsed | ~{est_left:>7s} left ', 'times_fstr_final': '| finished in {elapsed:} ', 'times_kwargs': {'mode': 'from_buffer', 'progress_buffer_size': 90}, 'write_to': 'stdout_noreturn'}, 'report_file': {'distributed_status_fstr': ' {progress_here:>5s} @ {host_name_short:12s} - {pid:7d}: {status:10s} ({tags})', 'max_num_to_show': 2048, 'min_num': 2, 'min_report_intv': 10, 'parser': 'report', 'show_distributed_run_info': True, 'show_exit_codes': True, 'show_host_info': True, 'show_individual_runtimes': True, 'task_label_plural': 'universes', 'task_label_singular': 'universe', 'write_to': {'file': {'path': '_report.txt', 'skip_if_dmv': False}}}}}, 'run_kwargs': {'timeout': None}, 'task_defaults': {'eval': {'create_symlinks': True, 'data_manager': {'out_dir_kwargs': {'exist_ok': False}}, 'out_dir': '{task_name:}', 'plot_manager': {'raise_exc': True}, 'priority': None}, 'run': {}}, 'tasks': {'eval': {}, 'run': {}}, 'worker_kwargs': {'forward_raw': True, 'forward_streams': False, 'popen_kwargs': {}, 'remove_ansi': True, 'save_raw': True, 'save_streams': True, 'streams_log_lvl': None}, 'worker_manager': {'interrupt_params': {'exit': False, 'grace_period': 5.0, 'send_signal': 'SIGTERM'}, 'lines_per_poll': 50, 'nonzero_exit_handling': 'warn_all', 'num_workers': 'auto', 'periodic_task_callback': 20, 'poll_delay': 0.05, 'rf_spec': {'after_cancel': ['progress_bar', 'report_file'], 'after_fail': ['report_file'], 'after_work': ['progress_bar', 'report_file'], 'before_working': [], 'monitor_updated': ['progress_bar'], 'task_finished': ['progress_bar', 'report_file'], 'task_invoked': [], 'task_skipped': ['progress_bar', 'report_file'], 'task_spawned': ['progress_bar'], 'while_working': ['progress_bar']}, 'save_streams_on': ['periodic_callback'], 'spawn_rate': -1}}#
Aggregated and recursively updated default batch task manager config
- utopya.batch.INVALID_TASK_NAME_CHARS = ('/', ':', '.', '?', '*')#
Substrings that may not appear in task names
- utopya.batch._eval_task(*, task_name: str, _batch_name: str, _batch_dirs: dict, _task_cfg_path: str, _create_symlinks: bool, model_name: str, model_kwargs: dict = {}, use_data_tree_cache: bool | None = None, print_tree: bool | str = 'condensed', plot_only: Sequence[str] | None = None, plots_cfg: str | None = None, update_plots_cfg: dict = {}, **frozen_mv_kwargs)[source]#
The evaluation task target for the multiprocessing.Process. It sets up a
utopya.model.Model, loads the data, and performs plots.
- class utopya.batch.BatchTaskManager(*, batch_cfg_path: str | None = None, **update_batch_cfg)[source]#
Bases:
objectA manager for batch tasks
- RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#
The time format string for the run directory
- __init__(*, batch_cfg_path: str | None = None, **update_batch_cfg)[source]#
Sets up a BatchTaskManager.
- Parameters:
batch_cfg_path (str, optional) – The batch file with all the task definitions.
**update_batch_cfg – Additional arguments that are used to update the batch configuration.
- Raises:
NotImplementedError – If
run_tasksorcluster_modewere set in the batch configuration.
- static _setup_batch_cfg(batch_cfg_path: str, **update_batch_cfg) dict[source]#
Sets up the BatchTaskManager configuration
- _setup_dirs(out_dir: str, note: str | None = None) Tuple[Dict[str, str], str][source]#
Sets up directories
- _add_tasks(tasks: dict, defaults: dict, add_task: Callable) int[source]#
Adds all configured run tasks to the WorkerManager’s task queue
- _add_eval_task(name: str, *, model_name: str, out_dir: str, enabled: bool = True, priority: int | None = None, create_symlinks: bool = False, **eval_task_kwargs)[source]#
Adds a single evaluation task to the WorkerManager.
- Parameters:
name (str) – Name of this task
model_name (str) – Model name; required in task, thus already requiring it here.
out_dir (str) – The path to the data output directory, i.e. the directory where all plots will ned up in. This may be a format string containing any of the following keys:
task_name,model_name,timestamp,batch_name(combination oftimestampand the note). Relative paths are evaluated relative to theevalbatch run directory.enabled (bool, optional) – If False, will not add this task.
priority (int, optional) – Task priority; tasks with smaller value will be picked first.
create_symlinks (bool, optional) – Whether to create symlinks that add crosslinks between related directories, e.g.: from the output directory, link back to the task configuration; from the evaluation output directory alongside the simulation data, link to the batch output directory
**eval_task_kwargs – All further evaluation task arguments.
utopya.cfg module#
Module that coordinates utopya’s persistent config directory
- utopya.cfg.UTOPYA_CFG_DIR = '/home/docs/.config/utopya'#
Path to the persistent utopya configuration directory
- utopya.cfg.UTOPYA_CFG_FILE_NAMES = {'batch': 'batch_cfg.yml', 'user': 'user_cfg.yml', 'utopya': 'utopya_cfg.yml'}#
Names and paths of valid configuration entries
- utopya.cfg.UTOPYA_CFG_FILE_PATHS = {'batch': '/home/docs/.config/utopya/batch_cfg.yml', 'user': '/home/docs/.config/utopya/user_cfg.yml', 'utopya': '/home/docs/.config/utopya/utopya_cfg.yml'}#
Absolute configuration file paths
- utopya.cfg.UTOPYA_CFG_SUBDIR_NAMES = {'models': 'models', 'projects': 'projects'}#
Names and paths of valid configuration subdirectories
- utopya.cfg.UTOPYA_CFG_SUBDIRS = {'models': '/home/docs/.config/utopya/models', 'projects': '/home/docs/.config/utopya/projects'}#
Absolute configuration file paths
- utopya.cfg.PROJECT_INFO_FILE_SEARCH_PATHS = ('.utopya_project.yml', '.utopya-project.yml')#
Potential names of project info files, relative to base directory
- utopya.cfg.get_cfg_path(cfg_name: str) str[source]#
Returns the absolute path to the specified configuration file
- utopya.cfg.load_from_cfg_dir(cfg_name: str) dict[source]#
Load a configuration file; returns empty dict if no file exists.
utopya.exceptions module#
utopya-specific exception types
- exception utopya.exceptions.UtopyaException[source]#
Bases:
BaseExceptionBase class for utopya-specific exceptions
- exception utopya.exceptions.ValidationError[source]#
Bases:
UtopyaException,ValueErrorRaised upon failure to validate a parameter
- exception utopya.exceptions.WorkerManagerError[source]#
Bases:
UtopyaExceptionThe base exception class for WorkerManager errors
- exception utopya.exceptions.WorkerManagerTotalTimeout[source]#
Bases:
WorkerManagerErrorRaised when a total timeout occurred
- exception utopya.exceptions.WorkerTaskError[source]#
Bases:
WorkerManagerErrorRaised when there was an error in a WorkerTask
- exception utopya.exceptions.WorkerTaskNonZeroExit(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Bases:
WorkerTaskErrorCan be raised when a WorkerTask exited with a non-zero exit code.
- __init__(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Initialize an error handling non-zero exit codes from workers
- exception utopya.exceptions.WorkerTaskStopConditionFulfilled(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Bases:
WorkerTaskNonZeroExitAn exception that is raised when a worker-specific stop condition was fulfilled. This allows being handled separately to other non-zero exits.
- exception utopya.exceptions.WorkerTaskSetupError[source]#
Bases:
WorkerTaskErrorRaised upon errors in the worker task setup function
- exception utopya.exceptions.SkipWorkerTask(reason: str, *args, **kwargs)[source]#
Bases:
WorkerTaskErrorRaised to indicate that a worker task should be skipped.
- exception utopya.exceptions.WorkerTaskNotSkippable[source]#
Bases:
WorkerTaskErrorRaised when a worker task was NOT marked as skippable but a skip event was raised.
- exception utopya.exceptions.MultiverseError[source]#
Bases:
UtopyaExceptionBase class for Multiverse-related exceptions
- exception utopya.exceptions.MultiverseRunAlreadyFinished[source]#
Bases:
MultiverseErrorRaised when a Multiverse run has already finished.
- exception utopya.exceptions.UniverseSetupError[source]#
Bases:
MultiverseErrorRaised on issues with universe during setup
- exception utopya.exceptions.UniverseOutputDirectoryError[source]#
Bases:
UniverseSetupErrorRaised on issues with universe output directory
- exception utopya.exceptions.SkipUniverse(reason: str, *args, **kwargs)[source]#
Bases:
SkipWorkerTask,MultiverseErrorRaised to indicate that a universe should be skipped.
- exception utopya.exceptions.SkipUniverseAfterSetup(reason: str, *args, **kwargs)[source]#
Bases:
SkipUniverseRaised to indicate that this universe (and all others) are deliberately skipped after their setup function was invoked.
- exception utopya.exceptions.YAMLRegistryError[source]#
Bases:
UtopyaException,ValueErrorBase class for errors in YAMLRegistry
- exception utopya.exceptions.EntryExistsError[source]#
Bases:
YAMLRegistryErrorRaised if an entry already exists
- exception utopya.exceptions.MissingEntryError[source]#
Bases:
YAMLRegistryErrorRaised if an entry is missing
- exception utopya.exceptions.MissingRegistryError[source]#
Bases:
YAMLRegistryErrorRaised if a registry is missing
- exception utopya.exceptions.EntryValidationError[source]#
Bases:
YAMLRegistryErrorRaised upon failed validation of a registry entry
- exception utopya.exceptions.SchemaValidationError[source]#
Bases:
YAMLRegistryErrorIf schema validation failed
- exception utopya.exceptions.ModelRegistryError[source]#
Bases:
UtopyaException,ValueErrorRaised on errors with model registry
- exception utopya.exceptions.MissingModelError[source]#
Bases:
ModelRegistryErrorRaised when a model is missing
- exception utopya.exceptions.BundleExistsError[source]#
Bases:
ModelRegistryErrorRaised when a bundle that compared equal already exists
- exception utopya.exceptions.MissingBundleError[source]#
Bases:
ModelRegistryErrorRaised when a bundle is missing
- exception utopya.exceptions.BundleValidationError[source]#
Bases:
ModelRegistryErrorRaised when the result of validating the existence of a bundle fails
- exception utopya.exceptions.ProjectRegistryError[source]#
Bases:
UtopyaException,ValueErrorRaised on errors with project registry
- exception utopya.exceptions.MissingProjectError[source]#
Bases:
ProjectRegistryErrorRaised on a missing project
- exception utopya.exceptions.ProjectExistsError[source]#
Bases:
ProjectRegistryErrorRaised if a project or project file of that name already exists
utopya.model module#
Provides the Model to work interactively with registered
utopya models
- class utopya.model.Model(*, name: str | None = None, info_bundle: ModelInfoBundle | None = None, bundle_label: str | None = None, base_dir: str | None = None, sim_errors: str | None = None, use_tmpdir: bool = False)[source]#
Bases:
objectA class to work with Utopia models interactively.
It attaches to a certain model and makes it easy to load config files, create a Multiverse from them, run it, and work with it further…
- CONFIG_SET_MODEL_SOURCE_SUBDIRS = ('cfgs', 'cfg_sets', 'config_sets')#
Directories within the model source directories to search through when looking for configuration sets. These are not used if the utopya config contains an entry overwriting this.
- __init__(*, name: str | None = None, info_bundle: ModelInfoBundle | None = None, bundle_label: str | None = None, base_dir: str | None = None, sim_errors: str | None = None, use_tmpdir: bool = False)[source]#
Initialize the ModelTest for the given model name
- Parameters:
name (str, optional) – Name of the model to attach to. If not given, need to pass info_bundle.
info_bundle (ModelInfoBundle, optional) – The required information to work with this model. If not given, will attempt to find the model in the model registry via
nameorbundle_label.bundle_label (str, optional) – A label to use for identifying the info bundle.
base_dir (str, optional) – For convenience, can specify this path which will be seen as the base path for config files; if set, arguments that allow specifying configuration files can specify them relative to this directory.
sim_errors (str, optional) – Whether to raise errors from Multiverse
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For
false, the regular model output directory is used.
- Raises:
ValueError – Upon bad
base_dir
- property info_bundle: ModelInfoBundle#
The model info bundle
- property name: str#
The name of this Model object, which is at the same time the name of the attached model.
- property base_dir: str#
Returns the path to the base directory, if set during init.
This is the path to a directory from which config files can be loaded using relative paths.
- property default_model_cfg: dict#
Returns the default model configuration by loading it from the file specified in the info bundle.
- property default_config_set_search_dirs: List[str]#
Returns the default config set search directories for this model in the order of precedence:
defined on the project-level via
cfg_set_abs_search_dirs; these may also be format strings supporting the following set of keys:model_name,project_base_dir, andmodel_source_dir(if set). If no project is associated, there will be no additional search directories.names of subdirectories relative to the model source directory, defined in
cfg_set_model_source_subdirs. If no model source directory is known, no search directories will be added. If no project is associated, a standard set of search directories is used:cfgs,cfg_sets,config_sets, as defined inCONFIG_SET_MODEL_SOURCE_SUBDIRS.
Note
The output may contain relative paths.
- property default_config_sets: Dict[str, dict]#
Config sets at the default search locations.
To retrieve an individual config set, consider using
get_config_set()instead of this property.For more information, see Configuration Sets.
- create_mv(*, from_cfg: str | None = None, from_cfg_set: str | None = None, run_cfg_path: str | None = None, use_tmpdir: bool | None = None, **update_meta_cfg) Multiverse[source]#
Creates a
utopya.multiverse.Multiversefor this model, optionally loading a configuration from a file and updating it with further keys.- Parameters:
from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.
from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with
from_cfgandrun_cfg_path.run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if
from_cfgorfrom_cfg_setarguments were given.use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.
**update_meta_cfg – Can be used to update the meta configuration
- Returns:
The created Multiverse object
- Return type:
- Raises:
ValueError – If more than one of the run config selecting arguments (
from_cfg,from_cfg_set,run_cfg_path) were given.
- create_frozen_mv(**fmv_kwargs) FrozenMultiverse[source]#
Create a
utopya.multiverse.FrozenMultiverse, coupling it to a run directory.Use this method if you want to load an existing simulation run.
- Parameters:
**fmv_kwargs – Passed on to
utopya.multiverse.FrozenMultiverse.__init__()
- create_distributed_mv(**dmv_kwargs) DistributedMultiverse[source]#
Create a
utopya.multiverse.FrozenMultiverse, coupling it to a run directory.Use this method if you want to load an existing simulation run.
- Parameters:
**fmv_kwargs – Passed on to
utopya.multiverse.FrozenMultiverse.__init__()
- create_run_load(*, from_cfg: str | None = None, run_cfg_path: str | None = None, from_cfg_set: str | None = None, use_tmpdir: bool | None = None, print_tree: bool = True, **update_meta_cfg) Tuple[Multiverse, DataManager][source]#
Chains the
create_mv(),mv.run, andmv.dm.load_from_cfgmethods calls together and returns a(Multiverse, DataManager)tuple.- Parameters:
from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.
from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with
from_cfgandrun_cfg_path.run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if
from_cfgorfrom_cfg_setarguments were given.use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.
print_tree (bool, optional) – Whether to print the loaded data tree
**update_meta_cfg – Arguments passed to the create_mv function
- Returns:
- The created Multiverse and the
corresponding DataManager (with data already loaded).
- Return type:
Tuple[Multiverse, DataManager]
- get_config_set(name: str | None = None) Dict[str, str][source]#
Returns a configuration set: a dict containing paths to run and/or eval configuration files. These are accessible via the keys
runandeval.Config sets are retrieved from multiple locations:
The
cfgsdirectory in the model’s source directoryThe user-specified lookup directories, specified in the utopya configuration as
config_set_search_dirsIf
nameis an absolute or relative path, and a directory exists at the specified location, the parent directory is interpreted as a search path.
This uses
get_config_sets()to retrieve all available configuration sets from the above paths and then selects the one with the givenname. Config sets that are found later overwrite those with the same name found in previous searches and log a warning message (which can be controlled with thewarnargument); sets are not merged.For more information, see Configuration Sets.
- Parameters:
name (str, optional) – The name of the config set to retrieve. This may also be a local path, which is looked up prior to the default search directories.
- get_config_sets(*, search_dirs: List[str] | None = None, warn: bool = True, cfg_sets: dict | None = None) Dict[str, dict][source]#
Searches for all available configuration sets in the given search directories, aggregating them into one dict.
The search is done in reverse order of the paths given in
search_dirs, i.e. starting from those directories with the lowest precedence. If configuration sets with the same name are encountered, warnings are emitted, but the one with higher precedence (appearing more towards the front ofsearch_dirs, i.e. the later-searched one) will take precedence.Note
This will not merge configuration sets from different search directories, e.g. if one contained only an eval configuration and the other contained only a run configuration, a warning will be emitted but the entry from the later-searched directory will be used.
- Parameters:
search_dirs (List[str], optional) – The directories to search sequentially for config sets. If not given, will use the default config set search directories, see
default_config_set_search_dirs.warn (bool, optional) – Whether to warn (via log message), if the search yields a config set with a name that already existed.
cfg_sets (dict, optional) – If given, aggregate newly found config sets into this dict. Otherwise, start with an empty one.
- _store_mv(mv: Multiverse, **kwargs) None[source]#
Stores a created Multiverse object and all the kwargs in a dict
- _create_tmpdir() TemporaryDirectory[source]#
Create a TemporaryDirectory
utopya.multiverse module#
Implementation of the Multiverse class which
sits at the heart of utopya and supplies the main user interface for the
frontend. It allows to run a simulation and then evaluate it.
- class utopya.multiverse.Multiverse(*, model_name: str | None = None, info_bundle: ModelInfoBundle | None = None, run_cfg_path: str | None = None, user_cfg_path: str | None = None, _shared_worker_manager: WorkerManager | None = None, **update_meta_cfg)[source]#
Bases:
objectThe Multiverse is where a single simulation run is orchestrated from.
It spawns multiple universes, each of which represents a single simulation of the selected model with the parameters specified by the meta configuration. The
WorkerManagertakes care to perform these simulations in parallel.The
Multiversethen interfaces with thedantrodata processing pipeline using classes specialized inutopya.eval: TheDataManagerloads the created simulation output, making it available in a uniformly accessible and hierarchical data tree. Then, thePlotManagerhandles plotting of that data.- RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#
The time format string for the run directory
- RUN_SUBDIRS: Tuple[str, ...] = ('config', 'data', 'eval', '.cache')#
Names or managed subdirectories of the run directory.
- RUN_SUBDIRS_REQUIRED: Tuple[str, ...] = ('config', 'data', 'eval')#
Names of required subdirectories; these are guaranteed to exist.
- BASE_META_CFG_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/stable/utopya/cfg/base_cfg.yml'#
Where the default meta-configuration can be loaded from.
- UTOPYA_BASE_PLOTS_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/stable/utopya/cfg/base_plots.yml'#
Where the utopya base plots configuration can be found; this is passed to the
PlotManager.
- USER_CFG_SEARCH_PATH = '/home/docs/.config/utopya/user_cfg.yml'#
Where to look for the user configuration
- __init__(*, model_name: str | None = None, info_bundle: ModelInfoBundle | None = None, run_cfg_path: str | None = None, user_cfg_path: str | None = None, _shared_worker_manager: WorkerManager | None = None, **update_meta_cfg)[source]#
Initialize the Multiverse.
- Parameters:
model_name (str, optional) – The name of the model to run
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the executable path etc. If not given, will attempt to read it from the model registry.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
_shared_worker_manager (WorkerManager, optional) –
If given, this already existing WorkerManager instance (and its reporter) will be used instead of initializing new instances.
Warning
This argument is only exposed for internal purposes. It should not be used for production code and behavior of this argument may change at any time.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- property info_bundle: ModelInfoBundle#
The model info bundle for this Multiverse
- property model: utopya.model.Model#
A model instance, created ad-hoc using the associated info bundle
- property status_file_paths: List[str]#
Retrieves status file paths in this Multiverse’s run directory
- property resolved_cluster_params: dict#
Returns a copy of the cluster configuration with all parameters resolved. This makes some additional keys available on the top level.
- property dm: DataManager#
The Multiverse’s DataManager.
- property wm: WorkerManager#
The Multiverse’s WorkerManager.
- property pm: PlotManager#
The Multiverse’s PlotManager.
- run(*, sweep: bool | None = None)[source]#
Starts a simulation run.
Specifically, this method adds simulation tasks to the associated WorkerManager, locks its task list, and then invokes the
start_working()method which performs all the simulation tasks.If cluster mode is enabled, this will split up the parameter space into (ideally) equally sized parts and only run one of these parts, depending on the cluster node this Multiverse is being invoked on.
Note
As this method locks the task list of the
WorkerManager, no further tasks can be added henceforth. This means, that each Multiverse instance can only perform a single simulation run.- Parameters:
sweep (bool, optional) – Whether to perform a sweep or not. If None, the value will be read from the
perform_sweepkey of the meta-configuration.
- run_single()[source]#
Runs a single simulation using the parameter space’s default value.
See
run()for more information.
- renew_plot_manager(**update_kwargs)[source]#
Tries to set up a new PlotManager. If this succeeds, the old one is discarded and the new one is associated with this Multiverse.
- Parameters:
**update_kwargs – Passed on to PlotManager.__init__
- classmethod _load_user_cfg(user_cfg_path: str | None = None) Tuple[str, dict][source]#
Loads the user configuration from a path; if no path is given, searches for it …
- classmethod _load_meta_cfg_parts(*, info_bundle: ModelInfoBundle, user_cfg_path: str | None = None) Tuple[Dict[str, str | None], Dict[str, dict | None]][source]#
Loads the various parts of the meta-configuration for a model and returns a dict of their paths and one of the loaded dictionaries.
- classmethod _assemble_meta_cfg_base_layers(*, info_bundle: ModelInfoBundle, user_cfg_path: str | None = None) Tuple[dict, Dict[str, str], Dict[str, dict]][source]#
Assembles the meta-configuration base layers, i.e. without the model default configuration or any run-specific updates.
It includes the following layers:
baseframeworkprojectmodel_mv(model-specific multiverse updates)user
Other layers are applied later.
Returns a 3-tuple of (assembled meta config, cfg paths, cfg dicts).
- _create_meta_cfg(*, run_cfg_path: str, user_cfg_path: str, update_meta_cfg: dict) dict[source]#
Create the meta configuration from several parts and store it.
The final configuration dict is built from multiple components, where one is always recursively updating the previous level. The resulting configuration is the meta configuration and is stored to the
meta_cfgattribute.The parts are recorded in the
cfg_partsdict and returned such that a backup can be created.- Parameters:
- Returns:
- dict of the parts that were needed to create the meta config.
The dict-key corresponds to the part name, the value is the payload which can be either a path to a cfg file or a dict
- Return type:
- _apply_debug_level(lvl: int | None = None)[source]#
Depending on the debug level, applies certain settings to the Multiverse and the runtime environment.
Note
This does not (yet) set the corresponding debug flags for the
PlotManager,DataManager, orWorkerManager!
- _create_run_dir(*, out_dir: str, model_note: str | None = None, dir_permissions: dict | None = None) None[source]#
Create the folder structure for the run output.
For the chosen model name and current timestamp, the run directory will be of form <timestamp>_<model_note> and be part of the following directory tree:
utopya_output model_a 180301-125410_my_model_note config data uni000 uni001 ... eval model_b 180301-125412_my_first_sim 180301-125413_my_second_sim
If running in cluster mode, the cluster parameters are resolved and used to determine the name of the simulation. The pattern then does not include a timestamp as each node might return not quite the same value. Instead, a value from an environment variable is used. The resulting path can have different forms, depending on which environment variables were present; required parts are denoted by a
*in the following pattern; if the value of the other entries is not available, the connecting underscore will not be used:{timestamp}_{job id*}_{cluster}_{job account}_{job name}_{note}
- Parameters:
out_dir (str) – The base output directory, where all Utopia output is stored.
model_note (str, optional) – The note to add to the run directory of the current run.
dir_permissions (Dict[str, str]) – If given, will set directory permissions on the specified managed directories of this Multiverse. The keys of this dict should be entries of the
dirsattribute, values should be octal permissions values given as a string.
- Raises:
RuntimeError – If the simulation directory already existed. This should not occur, as the timestamp is unique. If it occurs, you either started two simulations very close to each other or something is seriously wrong. Strange time zone perhaps?
- _get_run_dir(*, out_dir: str | None, run_dir: str | None, **__)[source]#
Helper function to find the run directory from arguments given to
__init__(). This is not actually used inMultiversebut inFrozenMultiverseandDistributedMultiverse.- Parameters:
- Raises:
- _setup_pm(**update_kwargs) PlotManager[source]#
Helper function to setup a PlotManager instance
- _parse_base_cfg_pools(base_cfg_pools: List[str | Tuple[str, dict | str]]) List[Tuple[str, dict | str]][source]#
Prepares the
base_cfg_poolsargument to be valid input to the PlotManager. This method resolves format strings and thus allows to more generically define base config pools.Possible formats for each entry of
base_cfg_poolsargument are:A 2-tuple
(name, pool dict)which specifies the name of the base config pool alongside with the pool entries.A 2-tuple
(name, path to pool config file), which is later loaded by the PlotManagerA shortcut key which resolves to the corresponding 2-tuple. Available shortcuts are:
utopya_base,framework_base,project_base, andmodel_base.
Both the pool name and path may be format strings which get resolved with the
model_namekey and (in the case of the path) the fullpathsdict of the current model’s info bundle. A format string may look like this:“{paths[source_dir]}/{model_name}_more_plots.yml” “~/some/more/plots/{model_name}/plots.yml”
If such a path cannot be resolved, an error is logged and an empty pool is used instead; this allows for more flexibility in defining locations for additional config pools.
- _perform_backup(*, cfg_parts: dict, backup_cfg_files: bool = True, backup_executable: bool = False, include_git_info: bool = True) None[source]#
Performs a backup of that information that can be used to recreate a simulation.
The configuration files are backed up into the
configsubdirectory of the run directory. All other relevant information is stored in an additionally createdbackupsubdirectory.Warning
These backups are created prior to the start of the actual simulation run and contains information known at that point. Any changes to the meta configuration made after initialization of the Multiverse will not be reflected in these backups.
In particular, the
perform_sweepandparameter_spaceentries of the meta configuration may not reflect which form of parameter space iteration was actually performed, because therun_singleandrun_sweepmethods overwrite this behavior. To that end, that information is separately stored once therunmethods are invoked.- Parameters:
cfg_parts (dict) – A dict of either paths to configuration files or dict-like data that is to be dumped into a configuration file.
backup_cfg_files (bool, optional) – Whether to backup the individual configuration files (i.e. the
cfg_partsinformation). If false, the meta configuration will still be backed up.backup_executable (bool, optional) – Whether to backup the executable. Note that these files can sometimes be quite large.
include_git_info (bool, optional) – If True, will store information about the state of the project’s (and framework’s, if existent) git repository.
- _perform_pspace_backup(pspace: ParamSpace, *, filename: str = 'parameter_space', **info_kwargs)[source]#
Stores the given parameter space and its metadata into the
configdirectory. Two files will be produced:config/{filename}.yml: the passedpspaceobjectconfig/{filename}_info.yml: the passedpspaceobject’sinfo dictionary containing relevant metadata (and the additionally passed
info_kwargs)
Note
This method is separated from the regular backup method
Multiverse._perform_backup()because the parameter space that is used during a simulation run may be a lower-dimensional version of the one the Multiverse was initialized with. To that end,run()will invoke this backup function again once the relevant information is fully determined. This is important because it is needed to communicate the correct information about the sweep to objects downstream in the pipeline (e.g.MultiversePlotCreator).- Parameters:
pspace (paramspace.paramspace.ParamSpace) – The ParamSpace object to save as backup.
filename (str, optional) – The filename (without extension!) to use. (This is also used for the log message, with underscores changed to spaces.)
**info_kwargs – Additional kwargs that are to be stored in the meta- data dict.
- _prepare_executable(*, run_from_tmpdir: bool = False, prefix: Tuple[str, ...] | None = None) None[source]#
Prepares the model executable, potentially copying it to a temporary location.
Also allows specifying a
prefixto the model executable, which can be used to control how the model is invoked.Note
The
run_from_tmpdirargument requires the executable to be relocatable to another location, i.e. be position-independent.Also, copying the executable to a temporary directory might not suffice in isolating it from all system changes, e.g. if dependencies that are imported during runtime change!
- Parameters:
run_from_tmpdir (bool, optional) – Whether to copy the executable to a temporary directory that goes out of scope once the Multiverse instance goes out of scope.
prefix (Tuple[str, ...], optional) – These arguments are prefixed to the model invocation. For instance, if this is
('python',), the resulting invocation command will be:python path/to/model/executable.py path/to/uni/run_cfg.yml
- Raises:
FileNotFoundError – On missing file at model binary path
PermissionError – On wrong access rights of file at the binary path
- _resolve_cluster_params() dict[source]#
This resolves the cluster parameters, e.g. by setting parameters depending on certain environment variables. This function is called by the resolved_cluster_params property.
- Returns:
The resolved cluster configuration parameters
- Return type:
- Raises:
ValueError – If a required environment variable was missing or empty
- _setup_universe(*, worker_kwargs: dict, model_name: str, model_executable: str, args_prefix: Tuple[str, ...], uni_cfg: dict, uni_basename: str) dict[source]#
Setup function for individual universes. These are realised through individual
WorkerTaskinstances, where this function is called as part of the setup routine, before a task is run.This is called before the worker process starts working on the universe.
- Parameters:
worker_kwargs (dict) – the current status of the worker_kwargs dictionary; is always passed to a task setup function
model_name (str) – The name of the model
model_executable (str) – path to the binary to execute
args_prefix (Tuple[str, ...]) – arguments that are prefixed to the model executable
uni_cfg (dict) – the configuration to create a yml file from which is then needed by the model
uni_basename (str) – Basename of the universe to use for folder creation, i.e.: zero-padded universe number, e.g. uni0042
- Returns:
- kwargs for the process to be run when task is grabbed by
Worker.
- Return type:
- _setup_universe_dir(uni_dir: str, *, uni_basename: str) None[source]#
Determines the universe directory and, if needed, creates it.
This is invoked from
_setup_universe()and is carried out directly before work on that universe starts.- Parameters:
uni_basename (str) – The basename of the universe to create the run directory for.
- _setup_universe_config(*, uni_cfg: dict, uni_dir: str, uni_cfg_path: str, mode: str = 'x') dict[source]#
Sets up the universe configuration and writes it to a file.
This is invoked from
_setup_universe()and is carried out directly before work on that universe starts.- Parameters:
- Returns:
The (potentially updated) universe configuration
- Return type:
- _setup_universe_worker_kwargs(*, model_executable: str, args_prefix: Tuple[str, ...], uni_cfg_path: str, uni_cfg: dict, uni_dir: str, save_streams: bool = False, **worker_kwargs) dict[source]#
Assembles worker kwargs for a specific universe.
This is invoked from
_setup_universe()and is carried out directly before work on that universe starts.- Returns:
- the combined worker kwargs, including
argsfor running the model executable.
- the combined worker kwargs, including
- Return type:
- _maybe_skip(context: str, *, desc: str, exc: Exception | None = None)[source]#
Called from the universe setup function in certain situations, this method checks how to proceed. It may trigger skipping of the task, raise an error (e.g. if skipping is disabled), or just continue without either of those, potentially causing an error later.
- _add_sim_task(*, uni_id_str: str, uni_cfg: dict, is_sweep: bool) None[source]#
Helper function that handles task assignment to the WorkerManager.
This function creates a WorkerTask that will perform the following actions once it is grabbed and worked at:
Create a universe (folder) for the task (simulation)
Write that universe’s configuration to a yaml file in that folder
Create the correct arguments for the call to the model binary
To that end, this method gathers all necessary arguments and registers a WorkerTask with the WorkerManager.
- Parameters:
uni_id_str (str) – The zero-padded uni id string
uni_cfg (dict) – given by ParamSpace. Defines how many simulations should be started
is_sweep (bool) – Flag is needed to distinguish between sweeps and single simulations. With this information, the forwarding of a simulation’s output stream can be controlled.
- Raises:
RuntimeError – If adding the simulation task failed
- _add_sim_tasks(*, sweep: bool | None = None) int[source]#
Adds the simulation tasks needed for a single run or for a sweep.
- Parameters:
sweep (bool, optional) – Whether tasks for a parameter sweep should be added or only for a single universe. If None, will read the
perform_sweepkey from the meta-configuration.- Returns:
The number of added tasks.
- Return type:
- Raises:
ValueError – On
sweep == Trueand zero-volume parameter space.
- _validate_meta_cfg() bool[source]#
Goes through the parameters that require validation, validates them, and creates a useful error message if there were invalid parameters.
- Returns:
- True if all parameters are valid; None if no check was done.
Note that False will never be returned, but a ValidationError will be raised instead.
- Return type:
- Raises:
ValidationError – If validation failed.
- class utopya.multiverse.FrozenMultiverse(*, model_name: str | None = None, info_bundle: ModelInfoBundle | None = None, run_dir: str | None = None, run_cfg_path: str | None = None, user_cfg_path: str | None = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#
Bases:
MultiverseA frozen Multiverse is like a Multiverse, but frozen.
It is initialized from a finished
Multiverserun and re-creates all the attributes from that data, e.g.: the meta configuration, a DataManager, and a PlotManager.Note
A frozen multiverse is no longer able to perform any simulations.
- __init__(*, model_name: str | None = None, info_bundle: ModelInfoBundle | None = None, run_dir: str | None = None, run_cfg_path: str | None = None, user_cfg_path: str | None = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#
Initializes the FrozenMultiverse from a model name and the name of a run directory.
Note that this also takes arguments to specify the run configuration to use.
- Parameters:
model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.
run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
use_meta_cfg_from_run_dir (bool, optional) – If True, will load the meta configuration from the given run directory; only works for absolute run directories.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- class utopya.multiverse.DistributedMultiverse(*, run_dir: str, model_name: str | None = None, info_bundle: ModelInfoBundle | None = None, no_reports: bool = False)[source]#
Bases:
FrozenMultiverseA distributed Multiverse is like a Multiverse, but initialized from an existing meta-configuration.
Unlike the FrozenMultiverse, it is able to continue, join or repeat an existing simulation run.
- __init__(*, run_dir: str, model_name: str | None = None, info_bundle: ModelInfoBundle | None = None, no_reports: bool = False)[source]#
Initializes a DistributedMultiverse from a model name and an existing run directory.
- Parameters:
run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.
model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.
no_reports (bool, optional) – If True, will not write work status or other simulation report files. Set this, if invoking this with many individual
universesand in order to avoid creating as many report files.
- run_single(*_, **__)[source]#
Runs a single simulation using the parameter space’s default value.
See
run()for more information.
- run(*, universes: Literal['all'] | str | List[str] = 'all', num_workers: int | None = None, timeout: float | None = None, on_existing_uni_dir: str = 'continue', on_existing_uni_cfg: str = 'continue', on_existing_uni_output: str = 'raise')[source]#
Starts a simulation run for all or a specified subset of universes, working on the existing run directory.
Using the
on_existing_uni_outputargument, it is possible to skip universes that already created output; alternatively, the output can be removed, effectively repeating the universe simulation.- Parameters:
universes (Union[Literal["all"], str, List[str]], optional) –
Which universes to run again. Can either be
all(default) to run all universes, or a selection of universe IDs. The selection can be given as a list of ID strings or a string of comma-separated IDs. Example for valid formats:[‘uni01’, ‘uni02’, ‘uni03’] ‘uni01,uni02,uni03’ [‘01’, ‘02’, ‘03’] 1,2,3
Leading zeros and
uniare optional.num_workers (int, optional) – Specify the number of workers to use, overwriting the setting from the meta-configuration.
timeout (float, optional) – If given, overwrites the existing value for the WorkerManager timeout, which may have been set in the original Multiverse run.
on_existing_uni_dir (str, optional) – How to proceed if a universe directory already exists; can be
skip,raise, orcontinue. Set this tocontinueif you previously generated all universe output directories.on_existing_uni_cfg (str, optional) – How to proceed if a universe configuration already exists; can be
skip,raise, orcontinue. Set this tocontinueif you previously generated all universe config files.on_existing_uni_output (str, optional) – What to do if universe output already exists. Options are
skip,raise,continueorclear; the latter will remove existing output files without prompting for this again!
- join_run(*, num_workers: int | None = None, shuffle_tasks: bool = True, timeout: float | None = None)[source]#
Joins an already-running simulation and performs tasks that have not been taken up yet.
- Parameters:
num_workers (int, optional) – Set number of workers to use.
shuffle_tasks (bool, optional) – If given, will overwrite the
shuffle_tasksrun arguments. When joining an already-running simulation run, it is advisable to set this to True to reduce competition for new tasks.timeout (float, optional) – If given, will overwrite the existing value for the WorkerManager timeout, which may have been set in the original Multiverse run.
- _prepare_executable(*args, **kwargs) None[source]#
Like the parent’s method, but restores the executable from its backup location, if it was backed up. Then calls the parent method.
- _perform_pspace_backup(**kwargs)[source]#
Overload that skips parameter space backup (already exists).
- utopya.multiverse.get_status_file_paths(run_dir: str, *, status_file_glob='.status*.yml') List[str][source]#
- utopya.multiverse.get_distributed_work_status(run_dir: str, **kwargs) Dict[str, dict | None][source]#
Finds and loads the work status files in the given directory
utopya.parameter module#
This module implements the Parameter class
which is used when validating model and simulation parameters.
- class utopya.parameter.Parameter(*, default: Any, name: str | None = None, description: str | None = None, is_any_of: Sequence[Any] | None = None, limits: Tuple[float | None, float | None] | None = None, limits_mode: str = '[]', dtype: str | type | None = None)[source]#
Bases:
objectThe parameter class is used when a model parameter needs to be validated before commencing the model run. It can hold information on the parameter itself as well as its valid range and type and other meta-data.
Per default, the
Parameterclass should be assumed to handle scalar parameters like numerical values or strings. For validating sequence-like parameters, corresponding specializing classes are to be implemeted.- SHORTHAND_MODES: Dict[str, Callable] = {'is-bool': <function Parameter.<lambda>>, 'is-in-unit-interval': <function Parameter.<lambda>>, 'is-int': <function Parameter.<lambda>>, 'is-negative': <function Parameter.<lambda>>, 'is-negative-int': <function Parameter.<lambda>>, 'is-negative-or-zero': <function Parameter.<lambda>>, 'is-positive': <function Parameter.<lambda>>, 'is-positive-int': <function Parameter.<lambda>>, 'is-positive-or-zero': <function Parameter.<lambda>>, 'is-probability': <function Parameter.<lambda>>, 'is-string': <function Parameter.<lambda>>, 'is-unsigned': <function Parameter.<lambda>>}#
Shorthand mode factory functions. These are used in the
from_shorthand()class method to generate aParameterobject more easily.Also,
utopya.yamlregisters each of these shorthand modes as a YAML constructor for tag!<mode>.
- LIMIT_COMPS: Dict[str, Callable] = {'(': <built-in function gt>, ')': <built-in function lt>, '[': <built-in function ge>, ']': <built-in function le>}#
Comparators for the
limitscheck, depending onlimits_mode
- __init__(*, default: Any, name: str | None = None, description: str | None = None, is_any_of: Sequence[Any] | None = None, limits: Tuple[float | None, float | None] | None = None, limits_mode: str = '[]', dtype: str | type | None = None)[source]#
Creates a new Parameter object, which holds a default value as well as some constraints on the possible values this parameter can assume.
- Parameters:
default (Any) – the default value of the parameter.
name (str, optional) – the name of the parameter.
description (str, optional) – a description of this parameter or its effects.
is_any_of (Sequence[Any], optional) – a sequence of possible values this parameter can assume. If this parameter is given,
limitscannot be used.limits (Tuple[Optional[float], Optional[float]], optional) – the upper and lower bounds of the parameter (only applicable to scalar numerals). If None, the bound is assumed to be negative or positive infinity, respectively. Whether boundary values are included into the interval is controlled by the
limits_modeargument. This argument is mutually exclusive withis_any_of!limits_mode (str, optional) – whether to interpret the limits as an open, closed, or semi-closed interval. Possible values:
'[]'(closed, default),'()'(open),'[)', and'(]'.dtype (Union[str, type], optional) – expected data type of this parameter. Accepts all strings that are accepted by
numpy.dtype, eg.int,float,uint16,string.
- Raises:
TypeError – On a
limitsargument that was not tuple-like or if alimitsargument was given but thedefaultwas aValueError – if an invalid
limits_modeis passed, iflimitsandis_any_ofare both passed, or if thelimitsargument did not have length 2.
- property default#
The default value for this parameter
- property name#
The name of this parameter
- property description#
The description of this parameter
- validate(value: Any, *, raise_exc: bool = True) bool[source]#
Checks whether the given value would be a valid parameter.
The checks for the corresponding arguments are carried out in the following order:
is_any_ofdtypelimits
The data type is checked according to the numpy type hierarchy, see docs. To reduce strictness, the following additional compatibilities are taken into account:
for unsigned integer
dtype, a signed integer-typevalueis compatible ifvalue >= 0for floating-point
dtype, integer-typevalueare always considered compatiblefor floating-point
dtype,valueof all floating-point- types are considered compatible, even if they have a lower precision (note the coercion test below, though)
Additionally, it is checked whether
valueis representable as the target data type. This is done by coercingvaluetodtypeand then checking for equality (using np.isclose).- Parameters:
value (Any) – The value to test.
raise_exc (bool, optional) – Whether to raise an exception or not.
- Returns:
Whether or not the given value is a valid parameter.
- Return type:
- Raises:
ValidationError – If validation failed or is impossible (for instance due to ambiguous validity parameters). This error message contains further information on why validation failed.
- classmethod from_shorthand(default: Any, *, mode: str, **kwargs)[source]#
Constructs a Parameter object from a given shorthand mode.
- Parameters:
default (Any) – the default value for the parameter
mode (str) – A valid shorthand mode, see
SHORTHAND_MODES**kwargs – any further arguments for Parameter ininitialization, see
__init__().
- Returns:
a Parameter object
- utopya.parameter.extract_validation_objects(model_cfg: dict, *, model_name: str) Tuple[dict, dict][source]#
Extracts all
Parameterobjects from a model configuration (a nested dict), replacing them with their default values. Returns both the modified model configuration well as the Parameter objects (keyed by the key sequence necessary to reach them within the model configuration).- Parameters:
- Returns:
- a tuple of (model config, parameters to validate).
The model config contains the passed config dict in which all Parameter class elements have been replaced by their default entries. The second entry is a dictionary consisting of the Parameter class objects (requiring validation) with keys being key sequences to those Parameter objects. Note that the key sequence is relative to the level above the model configuration, with
model_nameas a common entry for all returned values.
- Return type:
utopya.plotting module#
DEPRECATED module that provides backwards-compatibility for the old utopya module structure.
Deprecated since version 1.0.0: This module will be removed soon, please use utopya.eval instead.
utopya.project_registry module#
Implementation of the utopya project registry
- class utopya.project_registry.ProjectPaths(*, base_dir: Annotated[Path, PathType(path_type=dir)], project_info: Annotated[Path, PathType(path_type=file)] | None = None, models_dir: Annotated[Path, PathType(path_type=dir)] | None = None, py_tests_dir: Annotated[Path, PathType(path_type=dir)] | None = None, py_plots_dir: Annotated[Path, PathType(path_type=dir)] | None = None, mv_project_cfg: Annotated[Path, PathType(path_type=file)] | None = None, project_base_plots: Annotated[Path, PathType(path_type=file)] | None = None)[source]#
Bases:
BaseSchemaSchema to use for a project’s
pathsfield- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class utopya.project_registry.ProjectMetadata(*, version: str | None = None, long_name: str | None = None, description: str | None = None, long_description: str | None = None, license: str | None = None, authors: List[str] | None = None, email: str | None = None, website: str | None = None, utopya_compatibility: str | None = None, language: str | None = None, requirements: List[str] | None = None, misc: Dict[str, Any] | None = None)[source]#
Bases:
BaseSchemaSchema to use for a project’s
metadatafield- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class utopya.project_registry.ProjectSettings(*, preload_project_py_plots: bool | None = None, preload_framework_py_plots: bool | None = None)[source]#
Bases:
BaseSchemaSchema to use for a project’s
settingsfield- preload_project_py_plots: bool | None#
Whether to preload the project-level plot module (
py_plots_dir) after initialization of thePlotManager. If not given, will load the module.
- preload_framework_py_plots: bool | None#
Whether to preload the framework-level plot module (
py_plots_dir) after initialization of thePlotManager. If not given, will load the module.
- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class utopya.project_registry.ProjectSchema(*, project_name: str, framework_name: str | None = None, paths: ProjectPaths, metadata: ProjectMetadata, settings: ProjectSettings = {}, run_cfg_format: str = 'yaml', cfg_set_abs_search_dirs: List[str] | None = None, cfg_set_model_source_subdirs: List[str] | None = None, custom_py_modules: Dict[str, Annotated[Path, PathType(path_type=dir)]] | None = None, output_files: dict | None = None, debug_level_updates: Dict[str, dict] | None = None)[source]#
Bases:
BaseSchemaThe data model for a project registry entry
- paths: ProjectPaths#
- metadata: ProjectMetadata#
- settings: ProjectSettings#
- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class utopya.project_registry.Project(name: str, *, registry: YAMLRegistry = None, **data)[source]#
Bases:
RegistryEntryA registry entry that describes a project
- SCHEMA#
alias of
ProjectSchema
- property framework_project: Project | None#
If a framework project is defined, retrieve it from the registry
- get_git_info(*, include_patch_info: bool = False) dict[source]#
Returns information about the state of this project’s git repository using the
python-git-infopackage.If no git information is retrievable, e.g. because the project’s
base_dirdoes not contain a git repository, will still return a dict but withhave_git_infoentry set to False.Otherwise the git information will be in the
latest_commitentry.- Parameters:
include_patch_info (bool, optional) – If True, will attempt a subprocess call to
gitand store patch information alongside in thediffentry. In that case, thedirtyentry will denote whether there were uncommitted changes.- Returns:
A dict containing information about the associated git repo.
- Return type:
- class utopya.project_registry.ProjectRegistry(registry_dir: str | None = None)[source]#
Bases:
YAMLRegistryThe project registry
- __init__(registry_dir: str | None = None)[source]#
Initializes the project registry, loading available entries from the registry directory in the utopya config directory.
This also creates the
projectsdirectory, if not created yet.- Parameters:
registry_dir (str, optional) – A custom projects
- register(*, base_dir: str, info_file: str | None = None, custom_project_name: str | None = None, require_matching_names: bool | None = None, exists_action: str = 'raise') Project[source]#
Register or update information of a project.
- Parameters:
base_dir (str) – Project base directory
info_file (str, optional) – Path to info file which contains further path information and metadata (may be relative to base directory). If not given, will use some defaults to search for it.
custom_project_name (str, optional) – Custom project name, overwrites the one given in the info file
require_matching_names (bool, optional) – If set, will require that the custom project name is equal to the one given in the project info file. This allows checking that the file content does not diverge from some outside state.
exists_action (str, optional) – Action to take upon existing project
- Returns:
Project information for the new or validated project
- Return type:
- utopya.project_registry.PROJECTS = <utopya.project_registry.ProjectRegistry object>#
The package-wide project registry
utopya.reporter module#
Implementation of the reporter framework which can be used to report on the progress or result of operations within utopya.
- utopya.reporter._DEFAULT_distributed_status_fstr: str = ' {progress_here:>5s} @ {host_name_short:12s} - {pid:7d}: {status:10s} ({tags})'#
The format string to use for the joined run status
- class utopya.reporter.ReportFormat(*, parser: Callable, writers: List[Callable], min_report_intv: float | None = None)[source]#
Bases:
objectA report format aggregates callables for a single report parser and potentially multiple report writers. As a whole, it contains all arguments needed to generate a certain kind of report.
It is used in
utopya.reporter.Reporterand derived classes, which are the classes that actually implement the parsers and writers.- __init__(*, parser: Callable, writers: List[Callable], min_report_intv: float | None = None)[source]#
Initializes a ReportFormat object, which gathers callables needed to create a report in a certain format.
- Parameters:
parser (Callable) – The parser method to use
writers (List[Callable]) – The writer method(s) to use
min_report_intv (float, optional) – The minimum report interval of reports in this format. Determines the time (in seconds) that needs to have passed before the next report will be emitted.
- property min_report_intv: timedelta | None#
Returns the minimum report interval, i.e. the time that needs to have passed between two reports.
- property reporting_blocked: bool#
Determines whether this ReportFormat may be blocked from emission, e.g. because of the minimum report interval not having passed yet.
If no minimum report interval is given, will always return False. Otherwise checks if at least that interval has passed since the last report.
- class utopya.reporter.Reporter(*, report_formats: List[str] | Dict[str, dict] | None = None, default_format: str | None = None, report_dir: str | None = None, suppress_cr: bool = False)[source]#
Bases:
objectThe Reporter class holds general reporting capabilities.
It needs to be subclassed in order to specialize its reporting functions.
- __init__(*, report_formats: List[str] | Dict[str, dict] | None = None, default_format: str | None = None, report_dir: str | None = None, suppress_cr: bool = False)[source]#
Initialize the Reporter base class.
- Parameters:
report_formats (Union[List[str], Dict[str, dict]], optional) – The report formats to use with this reporter. If given as list of strings, the strings are the names of the report formats as well as those of the parsers; all other parameters are the defaults. If given as dict of dicts, the keys are the names of the formats and the inner dicts are the parameters to create report formats from.
default_format (str, optional) – The name of the default report format; if None is given, the .report method requires the name of a report format.
report_dir (str, optional) – if reporting to a file; this is the base directory that is reported to.
suppress_cr (bool, optional) – Whether to suppress carriage return characters in writers. This option is useful when the reporter is not the only class that writes to a stream.
- property default_format: None | ReportFormat#
Returns the default report format or None, if not set.
- property suppress_cr: bool#
Whether to suppress a carriage return. Objects using the reporter can set this property to communicate that they will be putting content into the stdout stream as well. The writers can check this property and adjust their behaviour accordingly.
- add_report_format(name: str, *, parser: str | None = None, write_to: str | Dict[str, dict] = 'stdout', min_report_intv: float | None = None, rf_kwargs: dict | None = None, **parser_kwargs)[source]#
Add a report format to this reporter.
- Parameters:
name (str) – The name of this format
parser (str, optional) – The name of the parser; if not given, the name of the report format is assumed
write_to (Union[str, Dict[str, dict]], optional) – The name of the writer. If this is a dict of dict, the keys will be interpreted as the names of the writers and the nested dict as the
**kwargsto the writer function.min_report_intv (float, optional) – The minimum report interval (in seconds) for this report format
rf_kwargs (dict, optional) – Further kwargs to ReportFormat.__init__
**parser_kwargs – The kwargs to the parser function
- Raises:
ValueError – A report format with this
namealready exists
- report(report_format: str | None = None, **kwargs) bool[source]#
Create a report with the given format; if none is given, the default format is used.
- Parameters:
report_format (str, optional) – The report format to use
**kwargs – Passed on to the ReportFormat.report() call
- Returns:
Whether there was a report
- Return type:
- Raises:
ValueError – If no default format was set and no report format name was given
- parse_and_write(*, parser: str | Callable, write_to: str | Callable, **parser_kwargs)[source]#
This function allows to select a parser and writer explicitly.
- Parameters:
- _resolve_parser(parser: str | Callable, **parser_kwargs) Callable[source]#
Given a string or a callable, returns the corresponding callable.
- Parameters:
parser (Union[str, Callable]) – If a callable is already given, returns that; otherwise looks for a parser method with the given name in the attributes of this class.
**parser_kwargs – Arguments that should be passed to the parser. If given, a new function is created where these arguments are already included.
- Returns:
The desired parser function
- Return type:
Callable
- Raises:
ValueError – If no parser with the given name is available
- _resolve_writers(write_to) Dict[str, Callable][source]#
Resolves the given argument to a list of callable writer functions.
- Parameters:
write_to –
a specification of the writers to use. Allows many different ways of specifying the writer functions, depending on the type of the argument:
str: the name of the writer method of this reporter
Callable: the writer function to use
sequence of str and/or Callable: the names and/or functions to use
Dict[str, dict]: the names of the writer functions and additional keyword arguments.
If the type is wrong, will raise.
- Returns:
the writers (key: name, value: writer method)
- Return type:
Dict[str, Callable]
- Raises:
TypeError – Invalid
write_toargumentValueError – A writer with that name was already added or a writer with the given name is not available.
- _write_to_stdout(s: str, *, flush: bool = True, **print_kws)[source]#
Writes the given string to stdout using the print function.
- _write_to_stdout_noreturn(s: str, *, prepend=' ')[source]#
Writes to stdout without ending the line. Always flushes.
- _write_to_log(s: str, *, lvl: int = 10, skip_if_empty: bool = False)[source]#
Writes the given string via the logging module.
- _write_to_file(s: str, *, path: str = '_report.txt', mode: str = 'w', skip_if_empty: bool = False)[source]#
Writes the given string to a file
- Parameters:
s (str) – The string to write
path (str, optional) – The path to write it to; will be assumed relative to the
report_dirattribute; if that is not given,pathneeds to be absolute. By default, assumes that there is areport_dirgiven.mode (str, optional) – Writing mode of that file
skip_if_empty (bool, optional) – Whether to skip writing if
sis empty.
- Raises:
ValueError – If
report_dirwas not set andpathis relative.
- class utopya.reporter.WorkerManagerReporter(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#
Bases:
ReporterThis class specializes the base
Reporterto report on theWorkerManagerstate and its progress.- TTY_MARGIN = 4#
Margin to use when writing to terminal
- PROGRESS_BAR_SYMBOLS = {'active': '░', 'active_progress': '▒', 'skipped': '»', 'space': ' ', 'success': '▓'}#
Symbols to use in progress bar parser
- LATEST_WM_REPORT_TO_STATUS: Dict[str, str] = {'after_cancel': 'cancelled', 'after_fail': 'failed', 'after_work': 'finished'}#
Maps WorkerManager report names to a worker status; used in determining the work status.
- __init__(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#
Initialize the specialized reporter for the
WorkerManager.It is aware of the WorkerManager and may additionally have acces to the
Multiverseit is embedded in, which provides additional information to report parsers.- Parameters:
wm (utopya.workermanager.WorkerManager) – The associated WorkerManager instance
mv (utopya.multiverse.Multiverse, optional) – The Multiverse this reporter is used in. If this is provided, it can be used in report parsers, e.g. to provide additional information on simulations.
**reporter_kwargs – Passed on to parent method
- property wm: utopya.workermanager.WorkerManager#
Returns the associated
WorkerManager
- property task_counters: OrderedDict#
Returns a dict of task counters from the WorkerManager
- property wm_elapsed: timedelta | None#
Seconds elapsed since start of working or None if not yet started
- property wm_times: dict#
Return the characteristics of WorkerManager times. Calls
get_progress_info()without any additional arguments.
- register_task(task: utopya.task.WorkerTask)[source]#
Given the task object, extracts and stores some information like its run time or its exit code. Exit codes are aggregated over multiple registrations.
This can be called from a callback function of a WorkerTask object in order to relay information to the reporter.
- Parameters:
task (utopya.task.WorkerTask) – The WorkerTask to extract information from.
- calc_runtime_statistics(min_num: int = 10) OrderedDict[source]#
Calculates the current runtime statistics.
- Parameters:
min_num (int, optional) – Minimum number of runtimes that need to be registered for advanced statistics to actually be computed. If below this number, not all entries will exist.
- Returns:
- The runtime statistics. If there are no runtimes yet,
only the
total (wall)entry will be there. If there are too few
- Return type:
OrderedDict
- get_progress_info(**eta_options) Dict[str, float][source]#
Compiles a dict containing progress information for the current work session.
- Parameters:
**eta_options – Passed on to method calculating
est_left,_compute_est_left().- Returns:
- Progress information. Guaranteed to contain the
keys
start,now,elapsed,est_left,est_end, andend.
- Return type:
- _compute_progress(counters: Dict[str, int] | None = None) Dict[str, float][source]#
Given task counters, computes various progress measures, each values between 0 and 1.
- _compute_est_left(*, progress: Dict[str, float], elapsed: timedelta, mode: str = 'from_start', progress_buffer_size: int = 60) timedelta | None[source]#
Computes the estimated time left until the end of the work session (ETA) using the current progress value and the elapsed time. Depending on
mode, additional information may be included in the calculation.Note
When task skipping is enabled, ETA computation becomes more difficult.
- Parameters:
progress (float) – The current progress value, in (0, 1]
elapsed (datetime.timedelta) – The elapsed time since start
mode (str, optional) –
By which mode to calculate the ETA. Available modes are:
from_start, where ETA is computed from the start ofwork session.
from_buffer, where ETA is computed from a morerecent point during the work session. This uses a buffer to keep track of recent progress and computes the ETA against the oldest record (controlled by argument
progress_buffer_size), giving more accurate estimates for long-running work sessions.
progress_buffer_size (int, optional) – The size of the ring buffer used in
from_buffermode.
- Returns:
- Estimate for how much time is left
until the end of the work session. If it cannot be estimated yet, e.g. because no progress was made, will return None.
- Return type:
Optional[datetime.timedelta]
- _parse_task_counters(*, report_no: int | None = None) str[source]#
Return a string that shows the task counters of the WorkerManager
- Parameters:
report_no (int, optional) – A counter variable passed by the
ReportFormatcall, indicating how often this parser was called so far.- Returns:
A str representation of the task counters of the WorkerManager
- Return type:
- _parse_progress(*, report_no: int | None = None) str[source]#
Returns a progress string
- Parameters:
report_no (int, optional) – A counter variable passed by the
ReportFormatcall, indicating how often this parser was called so far.- Returns:
A simple progress indicator
- Return type:
- _parse_progress_bar(*, num_cols: str | int = 'fixed', fstr: str = ' ╠{:}╣ {info:}{times:}', info_fstr: str = '{prg[total]:>5.1f}% ', show_times: bool = False, times_fstr: str = '| {elapsed:} elapsed | ~{est_left:} left ', times_fstr_final: str = '| finished in {elapsed:} ', times_kwargs: dict = {}, report_no: int | None = None) str[source]#
Returns a progress bar.
It shows the amount of finished tasks, active tasks, and a percentage.
- Parameters:
num_cols (Union[str, int], optional) – The number of columns available for creating the progress bar. Can also be a string
adaptiveto poll terminal size upon each call, orfixedto use the number of columns determined at import time.fstr (str, optional) – The format string for the final output. Should contain the
pbarstring, which makes up the progress bar, and can optionally contain the``info`` andtimessegments, formatted using the respective format string arguments.info_fstr (str, optional) –
The format string for the
infosection of the final output. Available keys:prg, dict with various progress measures in percent:total,active,skipped,failed,success, …cnt, the task counters dictionary, see:task_counters()
show_times (bool, optional) – Whether to show a short version of the results of the times parser
times_fstr (str, optional) – Format string for times information
times_fstr_final (str, optional) – Format string for times information once the work session has ended
times_kwargs (dict, optional) – Passed on to
timesparser. Only used ifshow_timesis set.report_no (int, optional) – A counter variable passed by the
ReportFormatcall, indicating how often this parser was called so far.
- Returns:
The one-line progress bar
- Return type:
- _parse_times(*, fstr: str = 'Elapsed: {elapsed:<8s} | Est. left: {est_left:<8s} | Est. end: {est_end:<10s}', timefstr_short: str = '%H:%M:%S', timefstr_full: str = '%d.%m., %H:%M:%S', use_relative: bool = True, times: dict | None = None, report_no: int | None = None, **progress_info_kwargs) str[source]#
Parses the WorkerManager’s time information, including estimated time left or others.
- Parameters:
fstr (str, optional) – The main format string; gets as keys the results of the WorkerManager time information. Available keys:
elapsed,est_left,est_end,start,now,end.timefstr_short (str, optional) – A time format string for absolute dates; short version.
timefstr_full (str, optional) – A time format string for absolute dates; long (ideally: full) version.
use_relative (bool, optional) – Whether for a date difference of 1 to use relative dates, e.g.
Today, 13:37.times (dict, optional) – A dict of times to use; this is mainly for testing purposes!
report_no (int, optional) – The report number passed by ReportFormat
**progress_info_kwargs – Passed on to method calculating progress
get_progress_info()
- Returns:
A string representation of the time information
- Return type:
- _parse_runtime_stats(*, fstr: str = ' {k:<13s} {v:}', join_char='\n', ms_precision: int = 1, report_no: int | None = None) str[source]#
Parses the runtime statistics dict into a multiline string
- Parameters:
fstr (str, optional) – The format string to use. Gets passed the keys
kandvwherekis the name of the entry andvits value. Note thatvis a non-numeric value.join_char (str, optional) – The join character / string to join the elements together.
ms_precision (int, optional) – Number of digits to represent the milliseconds part of the runtimes.
report_no (int, optional) – A counter variable passed by the
ReportFormatcall, indicating how often this parser was called so far.
- Returns:
The multi-line runtime statistics
- Return type:
- _parse_distributed_work_status(*, fstr: str = ' {progress_here:>5s} @ {host_name_short:12s} - {pid:7d}: {status:10s} ({tags})', distributed_work_status: dict | None = None, include_header: bool = True, report_no: int | None = None) str[source]#
Loads the work status of this and the distributed workers and creates a status string from it.
- _parse_report(*, fstr: str = ' {k:<{w:}s} {v:}', min_num: int = 2, report_no: int | None = None, show_host_info: bool = True, show_exit_codes: bool = True, show_distributed_run_info: bool = True, distributed_status_fstr: str = ' {progress_here:>5s} @ {host_name_short:12s} - {pid:7d}: {status:10s} ({tags})', show_individual_runtimes: bool = True, max_num_to_show: int = 2048, task_label_singular: str = 'task', task_label_plural: str = 'tasks') str[source]#
Parses a report for all tasks that were being worked on into a multiline string. The headings can be adjusted by keyword arguments.
- Parameters:
fstr (str, optional) – The format string to use. Gets passed the keys
kandvwherekis the name of the entry andvits value. Note that this format string is also used withvbeing a non-numeric value. Also,wcan be used to have a key column of constant width.min_num (int, optional) – The minimum number of universes needed to calculate extended runtime statistics.
report_no (int, optional) – A counter variable passed by the
ReportFormatcall, indicating how often this parser was called so far.show_host_info (bool, optional) – Whether to show basic information about the host machine
show_exit_codes (bool, optional) – Whether to show a table of exit codes of the finished simulations
show_distributed_run_info (bool, optional) – Whether to look for work status report files and show their information.
distributed_status_fstr (str, optional) – How to represent the work status of joined runs. Available keys are those from the status file plus
tags, which is a comma-separated string with information on whether this was a joined run (or main run) and a marker which run belongs to this report file.show_individual_runtimes (bool, optional) – Whether to report individual universe runtimes; default: True. This should be disabled if there are a huge number of universes.
max_num_to_show (int, optional) – Maximum number of tasks to list in the report
task_label_singular (str, optional) – The label to use in the report when referring to a single task.
task_label_plural (str, optional) – The label to use in the report when referring to multiple tasks.
- Returns:
The multi-line simulation report string
- Return type:
- _parse_pspace_info(*, fstr: str = '{sweep_info:}', min_tasks_added: int = 0, report_no: int | None = None) str[source]#
Provides information about the parameter space.
Extracts the
parameter_spacefrom the associated Multiverse’s meta configuration and provides information on that.If there are multiple tasks specified, it is assumed that a sweep is or was being carried out and an information string is retrieved from the
paramspace.paramspace.ParamSpaceobject, which is then returned. If only a single task was defined, returns an empty string.- Parameters:
fstr (str, optional) – The format string the sweep info should be embedded into. Needs to contain
sweep_infokey.min_tasks_added (int, optional) – Number of tasks that need to have been added in order for showing the parameter space info. If zero, will always return the pspace info, this can be useful if invoking this before the WorkerManager got any tasks!
report_no (int, optional) – A counter variable passed by the
ReportFormatcall, indicating how often this parser was called so far.
- Returns:
- If there is more than one task, returns the result of
paramspace.paramspace.ParamSpace.get_info_str(). If not, returns a string denoting that there was only one task.
- Return type:
- _parse_work_status(*, report_no: int | None = None) str[source]#
Supplies a very simple, YAML-formatted status string for this WorkerManager run.
- _write_to_file(*args, path: str = '_report.txt', cluster_mode_path: str = '{}_{node_name}{ext}', dmv_mode_path: str = '{}__{host_name_short}-{pid}{ext}', skip_if_dmv: bool = False, **kwargs)[source]#
Overloads the parent method with capabilities needed in cluster mode
All args and kwargs are passed through. If in cluster mode, the path is changed such that it includes the name of the node.
- Parameters:
*args – Passed on to parent method
path (str, optional) – The path to save to
cluster_mode_path (str, optional) – The format string to use for the path in cluster mode. Requires to contain the format key
{0:}which retains the givenpath, extension split off. Extension can be used viaext(already includes the dot). Additional format keys:node_name,job_id.dmv_mode_path (str, optional) – The format string to use for the path in a distributed Multiverse run.
skip_if_dmv (bool, optional) – Whether to skip reporting if part of a joined or continued run, i.e.: originating from a
DistributedMultiverserun.**kwargs – Passed on to parent method
utopya.stop_conditions module#
This module implements the StopCondition
class, which is used by the WorkerManager to
stop a worker process in certain situations.
In addition, it implements a set of basic stop condition functions and
provides the stop_condition_function()
decorator which is required to make them accessible by name.
- utopya.stop_conditions.SIG_STOPCOND: str = 'SIGUSR1'#
Signal to use for stopping workers with fulfilled stop conditions
- utopya.stop_conditions.STOP_CONDITION_FUNCS: Dict[str, Callable] = {'check_monitor_entry': <function check_monitor_entry>, 'timeout_wall': <function timeout_wall>}#
Registered stop condition functions are stored in this dictionary. These functions evaluate whether a certain stop condition is actually fulfilled.
To that end, a
WorkerTaskobject is passed to these functions, the information in which can be used to determine whether the condition is fulfilled. The signature of these functions is:(task: WorkerTask, **kws) -> bool
- utopya.stop_conditions._FAILED_MONITOR_ENTRY_CHECKS = []#
Keeps track of failed monitor entry checks in the
check_monitor_entry()stop condition function in order to avoid repetitive warnings.
- class utopya.stop_conditions.StopCondition(*, to_check: List[dict] | None = None, name: str | None = None, description: str | None = None, enabled: bool = True, func: Callable | str | None = None, **func_kwargs)[source]#
Bases:
objectA StopCondition object holds information on the conditions in which a worker process should be stopped.
- __init__(*, to_check: List[dict] | None = None, name: str | None = None, description: str | None = None, enabled: bool = True, func: Callable | str | None = None, **func_kwargs)[source]#
Create a new stop condition object.
- Parameters:
to_check (List[dict], optional) – A list of dicts, that holds the functions to call and the arguments to call them with. The only requirement for the dict is that the
funckey is available. All other keys are unpacked and passed as kwargs to the given function. Thefunckey can be either a callable or a string corresponding to a name in the utopya.stopcond_funcs module.name (str, optional) – The name of this stop condition
description (str, optional) – A short description of this stop condition
enabled (bool, optional) – Whether this stop condition should be checked; if False, it will be created but will always be un- fulfilled when checked.
func (Union[Callable, str], optional) – (For the short syntax only!) If no
to_checkargument is given, a function can be given here that will be the only one that is checked. If this argument is a string, it is also resolved from the utopya stopcond_funcs module.**func_kwargs – (For the short syntax) The kwargs that are passed to the single stop condition function
- property fulfilled_for: Set[utopya.task.Task]#
The set of tasks this stop condition was fulfilled for
- static _resolve_sc_funcs(to_check: List[dict], func: str | Callable, func_kwargs: dict) List[tuple][source]#
Resolves the functions and kwargs that are to be checked.
The callable is either retrieved from the module-level stop condition functions registry or, if the given
funcis already a callable, that one will be used.
- __str__() str[source]#
A string representation for this StopCondition, including the name and, if given, the description.
- fulfilled(task: utopya.task.Task) bool[source]#
Checks if the stop condition is fulfilled for the given worker, using the information from the dict.
All given stop condition functions are evaluated; if all of them return True, this method will also return True.
Furthermore, if the stop condition is fulfilled, the task’s set of fulfilled stop conditions will
- Parameters:
task (utopya.task.Task) – Task object that is to be checked
- Returns:
- If all stop condition functions returned true for the given
worker and its current information
- Return type:
- yaml_tag = '!stop-condition'#
- classmethod to_yaml(representer, node)[source]#
Creates a yaml representation of the StopCondition object by storing the initialization kwargs as a yaml mapping.
- Parameters:
representer (ruamel.yaml.representer) – The representer module
node (StopCondition) – The node, i.e. an instance of this class
- Returns:
a yaml mapping that is able to recreate this object
- utopya.stop_conditions.stop_condition_function(f: Callable)[source]#
A decorator that registers the decorated callable in the module-level stop condition function registry. The callable’s
__name__attribute will be used as the key.- Parameters:
f (Callable) – A callable that is to be added to the function registry.
- Raises:
AttributeError – If the name already exists in the registry
- utopya.stop_conditions.timeout_wall(task: utopya.task.WorkerTask, *, seconds: float) bool[source]#
Checks the wall timeout of the given worker
- Parameters:
task (utopya.task.WorkerTask) – The WorkerTask object to check
seconds (float) – After how many seconds to trigger the wall timeout
- Returns:
Whether the timeout is fulfilled
- Return type:
- utopya.stop_conditions.check_monitor_entry(task: utopya.task.WorkerTask, *, entry_name: str, operator: str, value: float) bool[source]#
Checks if a monitor entry compares in a certain way to a given value
- Parameters:
task (utopya.task.WorkerTask) – The WorkerTask object to check
entry_name (str) – The name of the monitor entry, leading to the value to the left-hand side of the operator
operator (str) – The binary operator to use
value (float) – The right-hand side value to compare to
- Returns:
Result of op(entry, value)
- Return type:
utopya.task module#
The Task class supplies a container for all information needed for a task.
The WorkerTask and ProcessTask classes specialize on tasks for the WorkerManager that work on subprocesses or multiprocessing processes.
- utopya.task._ANSI_ESCAPE = re.compile('\\x1B(?:[@-Z\\\\-_]|\\[[0-?]*[ -/]*[@-~])')#
A regex pattern to remove ANSI escape characters, needed for stream saving
- utopya.task._follow(f: ~_io.TextIOWrapper, delay: float = 0.05, should_stop: ~typing.Callable = <function <lambda>>) Generator[str, None, None][source]#
Generator that follows the output written to the given stream object and yields each new line written to it. If no output is retrieved, there will be a delay to reduce processor load.
The
should_stopargument may be a callable that will lead to breaking out of the waiting loop. If it is not given, the loop will only break if reading from the streamfis no longer possible, e.g. because it was closed.
- utopya.task.enqueue_lines(*, queue: Queue, stream: TextIO, follow: bool = False, parse_func: Callable | None = None) None[source]#
From the given text stream, read line-buffered lines and add them to the provided queue as 2-tuples, (line, parsed object).
This function is meant to be passed to an individual thread in which it can read individual lines separately from the main thread. Before exiting this function, the stream is closed.
- Parameters:
queue (queue.Queue) – The queue object to put the read line and parsed objects into.
stream (TextIO) – The stream identifier. If this is not a text stream, be aware that the elements added to the queue might need decoding.
follow (bool, optional) – If instead of
iter(stream.readline), the_follow()function should be used instead. This should be selected if the stream is file-like instead ofsys.stdout-like.parse_func (Callable, optional) – A parse function that the read line is passed through. This should be a unary function that either returns a successfully parsed line or None.
- utopya.task.parse_yaml_dict(line: str, *, start_str: str = '!!map') None | dict[source]#
A yaml parse function that can be passed to enqueue_lines. It only tries parsing the line if it starts with the provided start string.
It tries to decode the line, and parse it as a yaml. If that fails, it will still try to decode the string. If that fails yet again, the unchanged line will be returned.
- class utopya.task.Task(*, name: str | None = None, priority: float | None = None, callbacks: Dict[str, Callable] | None = None, progress_func: Callable | None = None, skippable: bool = True)[source]#
Bases:
objectThe Task is a container for a task handled by the WorkerManager.
It aims to provide the necessary interfaces for the WorkerManager to easily associate tasks with the corresponding workers and vice versa.
- __init__(*, name: str | None = None, priority: float | None = None, callbacks: Dict[str, Callable] | None = None, progress_func: Callable | None = None, skippable: bool = True)[source]#
Initialize a Task object.
- Parameters:
name (str, optional) – The task’s name. If none is given, the generated uuid will be used.
priority (float, optional) – The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority.
callbacks (Dict[str, Callable], optional) – A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object.
progress_func (Callable, optional) – Invoked by the
progressproperty and used to calculate the progress given the current task object as argument
- _name#
- _priority#
- _uid#
- callbacks#
- _progress_func#
- _stop_conditions#
- _skipping#
- property progress: float#
If a progress function is given, invokes it; otherwise returns 0
This also performs checks that the progress is in [0, 1]
- property fulfilled_stop_conditions: Set[StopCondition]#
The set of fulfilled stop conditions for this task. Typically, this is set by the
StopConditionitself as part of its evaluation in itsfulfilled()method.
- property was_skipped: bool | None#
Whether this task was skipped. If None, the task has not finished yet, so the final evaluation is still open.
- class utopya.task.WorkerTask(*, setup_func: Callable | None = None, setup_kwargs: dict | None = None, worker_kwargs: dict | None = None, **task_kwargs)[source]#
Bases:
TaskA specialisation of
Taskfor use in theWorkerManager.It is able to spawn a worker process using
subprocess.Popen, executing the task in a non-blocking manner. At the same time, the worker’s stream can be read in via another non-blocking thread and stream information can be parsed. Furthermore, this class provides most of the interface for signalling the spawned process.For an equivalent class that uses
multiprocessinginstead ofsubprocess, see the derivedMPProcessTask.- STREAM_PARSE_FUNCS = {'default': None, 'yaml_dict': <function parse_yaml_dict>}#
- __init__(*, setup_func: Callable | None = None, setup_kwargs: dict | None = None, worker_kwargs: dict | None = None, **task_kwargs)[source]#
Initialize a WorkerTask.
This is a specialization of
Taskfor use in theWorkerManager.- Parameters:
setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the
worker_kwargskeyword argument, containing the dict passed here. Additionally,setup_kwargsare unpacked into the funtion call. The function should return a dict that is then used asworker_kwargsfor the individual task.setup_kwargs (dict, optional) – The keyword arguments unpacked into the
setup_funccall.worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to
setup_funcand, if asetup_funcis given, the return value of that function will be used for theworker_kwargs.**task_kwargs – Arguments to be passed to
__init__(), including the callbacks dictionary among other things.
- Raises:
ValueError – If neither
setup_funcnorworker_kwargswere given, thus lacking information on how to spawn the worker.
- setup_func#
- setup_kwargs#
- worker_kwargs#
- _worker#
- _worker_pid#
- _worker_status#
- streams#
- profiling#
- property worker_status: int | None#
The worker processe’s current status or False, if there is no worker spawned yet.
Note that this invokes a poll to the worker process if one was spawned.
- Returns:
- Current worker status. False, if there was no
worker associated yet.
- Return type:
Union[int, None]
- property outstream_objs: List[dict]#
Returns the list of objects parsed from the ‘out’ stream, typically dictionaries.
If there are no streams, returns an empty list.
- spawn_worker() Popen | None[source]#
Spawn a worker process using subprocess.Popen and manage the corresponding queue and thread for reading the stdout stream.
If there is a
setup_func, this function will be called first.Afterwards, from the worker_kwargs returned by that function or from the ones given during initialisation (if no
setup_funcwas given), the worker process is spawned and associated with this task.- Returns:
The created process object
- Return type:
- Raises:
RuntimeError – If a worker was already spawned for this task.
- read_streams(stream_names: list = 'all', *, max_num_reads: int = 10, forward_directly: bool = False) None[source]#
Read the streams associated with this task’s worker.
- Parameters:
stream_names (list, optional) – The list of stream names to read. If
all(default), will read all streams.max_num_reads (int, optional) –
How many lines should be read from the buffer. For -1, reads the whole buffer.
Warning
Do not make this value too large as it could block the whole reader thread of this worker.
forward_directly (bool, optional) – Whether to call the
forward_streams()method; this is done before the callback and can be useful if the callback should not happen before the streams are forwarded.
- Returns:
None
- save_streams(stream_names: list = 'all', *, final: bool = False)[source]#
For each stream, checks if it is to be saved, and if yes: saves it.
The saving location is stored in the streams dict. The relevant keys are the save flag and the save_path string.
Note that this function does not save the whole stream log, but only those part of the stream log that have not already been saved. The position up to which the stream was saved is stored under the lines_saved key in the stream dict.
- Parameters:
stream_names (list, optional) – The list of stream names to _check_. If ‘all’ (default), will check all streams whether the save flag is set.
save_raw (bool, optional) – If True, stores the raw log; otherwise stores the regular log, i.e. the lines that were parseable not included.
final (bool, optional) – If True, this is regarded as the final save operation for the stream, which will lead to additional information being saved to the end of the log.
remove_ansi (bool, optional) – If True, will remove ANSI escape characters (e.g. from colored logging) from the log before saving to file.
- Returns:
None
- forward_streams(stream_names: list = 'all', forward_raw: bool = False) bool[source]#
Forwards the streams to stdout, either via logging module or print
This function can be periodically called to forward the part of the stream logs that was not already forwarded to stdout.
The information for that is stored in the stream dict. The log_level entry is used to determine whether the logging module should be used or (in case of None) the print method.
- signal_worker(signal: str) tuple[source]#
Sends a signal to this WorkerTask’s worker.
- Parameters:
signal (str) – The signal to send. Needs to be a valid signal name, i.e.: available in python signal module.
- Raises:
ValueError – When an invalid signal argument was given
- Returns:
(signal: str, signum: int) sent to the worker
- Return type:
- _prepare_process_args(*, args: tuple, read_stdout: bool, **kwargs) Tuple[tuple, dict][source]#
Prepares the arguments that will be passed to subprocess.Popen
- _prepare_worker_kwargs() dict[source]#
Prepares worker kwargs; if a setup function is given, will invoke it to potentially update the existing worker kwargs.
- _invoke_setup_func() dict[source]#
Invokes the setup function, which returns potentially adjusted worker kwargs.
Also takes care of error handling.
- _spawn_process(args, **popen_kwargs)[source]#
This helper takes care only of spawning the actual process and potential error handling.
It can be subclassed to spawn a different kind of process
- _spawn_worker(*, args: tuple, popen_kwargs: dict | None = None, read_stdout: bool = True, **_) Popen[source]#
Helper function to spawn the worker subprocess
- _setup_stream_reader(stream_name: str, *, stream, parser: str = 'default', follow: bool = False, save_streams: bool = False, save_streams_to: str | None = None, save_raw: bool = True, remove_ansi: bool = False, forward_streams: bool = False, forward_raw: bool = True, streams_log_lvl: int | None = None, **_)[source]#
Sets up the stream reader thread
- _stop_stream_reader(name: str)[source]#
Stops the stream reader with the given name by closing the associated stream’s file handle.
- utopya.task._target_wrapper(target, streams: dict, *args, **kwargs)[source]#
A wrapper around the multiprocessing.Process target function which takes care of stream handling.
- class utopya.task.PopenMPProcess(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = -1, encoding: str = 'utf8', shell: bool | None = None)[source]#
Bases:
objectA wrapper around multiprocessing.Process that replicates (wide parts of) the interface of subprocess.Popen.
- __init__(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = -1, encoding: str = 'utf8', shell: bool | None = None)[source]#
Creates a
multiprocessing.Processand starts it.The interface here is a subset of
subprocess.Popenthat makes those features available that make sense for amultiprocessing.Process, mainly: stream reading.Subsequently, the interface is quite a bit different to that of the
multiprocessing.Process. The most important arguments of that interface aretarget,args, andkwargs, which can be set as follows:targetwill beargs[0]argswill beargs[1:]kwargsis an additional keyword argument that is not part of thesubprocess.Popeninterface typically.
Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a
subprocess.PIPEor another stream specifier that is notsubprocess.DEVNULL, a newmultiprocessing.Pipeand a reader thread will be established.Warning
This will always use
spawnas a start method for the process!- Parameters:
args (tuple) – The
targetcallable (args[0]) and subsequent positional arguments.kwargs (dict, optional) – Keyword arguments for the
target.stdin (None, optional) – The stdin stream
stdout (None, optional) – The stdout stream
stderr (None, optional) – The stderr stream
bufsize (int, optional) – The buffersize to use.
encoding (str, optional) – The encoding to use for the streams; should typically remain
utf8, using other values is not encouraged!shell (bool, optional) – Ignored here, exists only to provide the same interface as with
subprocess.Popen.
- _prepare_target_args(args: tuple, *, stdin, stdout, stderr) Tuple[Callable, tuple][source]#
Prepares the target callable and stream objects
- poll() int | None[source]#
Check if child process has terminated. Set and return
returncodeattribute. Otherwise, returns None.With the underlying process being a multiprocessing.Process, this method is equivalent to the
returncodeproperty.
- wait(timeout=None)[source]#
Wait for the process to finish; blocking call.
This method is not yet implemented, but will be!
- communicate(input=None, timeout=None)[source]#
Communicate with the process.
This method is not yet implemented! Not sure if it will be …
- property args: tuple#
The
argsargument to this process. Note that the returned tuple includes the target callable as its first entry.Note that these have already been passed to the process; changing them has no effect.
- property kwargs#
Keyword arguments passed to the target callable.
Note that these have already been passed to the process; changing them has no effect.
- property stdin#
The attached
stdinstream
- property stdout#
The attached
stdoutstream
- property stderr#
The attached
stderrstream
- property pid#
Process ID of the child process
- class utopya.task.MPProcessTask(*, setup_func: Callable | None = None, setup_kwargs: dict | None = None, worker_kwargs: dict | None = None, **task_kwargs)[source]#
Bases:
WorkerTaskA WorkerTask specialization that uses multiprocessing.Process instead of subprocess.Popen.
It is mostly equivalent to
WorkerTaskbut adjusts the private methods that take care of spawning the actual process and setting up the stream readers, such that the particularities of thePopenMPProcesswrapper are accounted for.- _spawn_process(args, **popen_kwargs) PopenMPProcess[source]#
This helper takes care only of spawning the actual process and potential error handling. It returns an PopenMPProcess instance, which has the same interface as subprocess.Popen.
- _setup_stream_reader(*args, **kwargs)[source]#
Sets up the stream reader with
follow=True, such that the file- like streams that PopenMPProcess uses can be read properly.
- _stop_stream_reader(name: str)[source]#
Stops the stream reader thread with the given name by telling its follow function to stop, thus ending iteration.
- setup_func#
- setup_kwargs#
- worker_kwargs#
- _worker#
- _worker_pid#
- _worker_status#
- streams#
- profiling#
- class utopya.task.TaskList[source]#
Bases:
objectThe TaskList stores Task objects in it, ensuring that none is in there twice and allows to lock it to prevent adding new tasks.
- lock()[source]#
If called, the TaskList becomes locked and allows no further calls to the append method.
- append(val: Task)[source]#
Append a Task object to this TaskList
- Parameters:
val (Task) – The task to add
- Raises:
RuntimeError – If TaskList object was locked
TypeError – Tried to add a non-Task type object
ValueError – Task already added to this TaskList
utopya.testtools module#
Tools that help testing models.
This mainly supplies the ModelTest class, which is a specialization of the
Model for usage in tests.
- class utopya.testtools.ModelTest(model_name: str, *, test_file: str | None = None, use_tmpdir: bool = True, **kwargs)[source]#
Bases:
ModelA class to use for testing Utopia models.
It attaches to a certain model and makes it easy to load config files with which test should be carried out.
- __init__(model_name: str, *, test_file: str | None = None, use_tmpdir: bool = True, **kwargs)[source]#
Initialize the ModelTest class for the given model name.
This is basically like the base class __init__ just that it sets the default value of
use_tmpdirto True and renames TODO- Parameters:
model_name (str) – Name of the model to test
test_file (str, optional) – The file this ModelTest is used in. If given, will look for config files relative to the folder this file is located in.
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.
- Raises:
ValueError – If the directory extracted from test_file is invalid
utopya.tools module#
Implements generally useful functions, partly by importing from
dantro.tools
- utopya.tools.load_selected_keys(src: dict, *, add_to: dict, keys: Sequence[Tuple[str, type, bool]], err_msg_prefix: str | None = None, prohibit_unexpected: bool = True) None[source]#
Loads (only) selected keys from dict
srcinto dictadd_to.- Parameters:
src (dict) – The dict to load values from
add_to (dict) – The dict to load values into
keys (Sequence[Tuple[str, type, bool]]) – Which keys to load, given as sequence of
(key, allowed types, [required=False])tuples.err_msg_prefix (str) – A description string, used in error message
prohibit_unexpected (bool, optional) – Whether to raise on keys that were unexpected, i.e. not given in
keysargument.
- Raises:
KeyError – On missing key in
srcTypeError – On bad type of value in
srcValueError – On unexpected keys in
src
- utopya.tools.add_item(value, *, add_to: dict, key_path: Sequence[str], value_func: Callable | None = None, is_valid: Callable | None = None, ErrorMsg: Callable | None = None) None[source]#
Adds the given value to the
add_todict, traversing the given key path. This operation happens in-place.- Parameters:
value – The value of what is to be stored. If this is a callable, the result of the call is stored.
add_to (dict) – The dict to add the entry to
key_path (Sequence[str]) – The path at which to add it
value_func (Callable, optional) – If given, calls it with
valueas argument and uses the return value to add to the dictis_valid (Callable, optional) – Used to determine whether
valueis valid or not; should take single positional argument, return boolErrorMsg (Callable, optional) – A raisable object that prints an error message; gets passed
valueas positional argument.
- Raises:
Exception – type depends on specified
ErrorMsgcallable
- utopya.tools.pprint(obj: Any, **kwargs)[source]#
Prints a “pretty” string representation of the given object.
- Parameters:
obj (Any) – The object to print
**kwargs – Passed to print
- utopya.tools.pformat(obj: Any) str[source]#
Creates a “pretty” string representation of the given object.
This is achieved by creating a yaml representation.
Todo
Improve parsing of leaf-level mappings
- utopya.tools.ensure_not_None(d: ~typing.Any | None, fallback: type | ~typing.Callable = <class 'dict'>) Any[source]#
Returns
dif it is not None, otherwise creates a new object by callingfallbackwithout any arguments.
- utopya.tools.parse_si_multiplier(s: str) int[source]#
Parses a string like
1.23Mor-2.34 kinto an integer.If it is a string, parses the SI multiplier and returns the appropriate integer for use as number of simulation steps. Supported multipliers are
k,M,GandT. These need to be used as the suffix of the string.Note
This is only intended to be used with integer values and does not support float values like
1u.The used regex can be found here.
- Parameters:
s (str) – A string representing an integer number, potentially including a supported SI multiplier as suffix.
- Returns:
- The parsed number of steps as integer. If the value has decimal
places, integer rounding is applied.
- Return type:
- Raises:
ValueError – Upon string that does not match the expected pattern
- utopya.tools.parse_num_steps(N: str | int, *, raise_if_negative: bool = True) int[source]#
Given a string like
1.23Mor an integer, prepares the num_steps argument for a single universe simulation.For string arguments, uses
parse_si_multiplier()for string parsing. If that fails, attempts to read it in float notation by callingint(float(N)).Note
This function always applies integer rounding.
- Parameters:
- Returns:
The parsed value for
num_steps- Return type:
- Raises:
ValueError – Result invalid, i.e. not parseable or of negative value.
utopya.workermanager module#
The WorkerManager is a central part of utopya in that it
spawns and controls the tasks (WorkerTask) that are
to be worked on.
- utopya.workermanager.STOPCOND_EXIT_CODES: Sequence[int] = (-10, 10, 138)#
Exit codes of a
WorkerTaskthat will be interpreted as stemming from a stop condition. This depends on the signal used for stop conditions (utopya.stop_conditions.SIG_STOPCOND). This sequence of possible exit codes takes into account that the sign may be switched (depending on whether a signed or unsigned integer convention is used) or where a convention is used such that a handled signal is turned into an exit code of128 + abs(signum).
- utopya.workermanager.POLL_DELAY_WARNING_THRS: float = 0.001#
Below this poll_delay time, will issue a warning.
- class utopya.workermanager.WorkerManager(num_workers: int | str = 'auto', poll_delay: float = 0.05, spawn_rate: int = -1, lines_per_poll: int = 50, periodic_task_callback: int | None = None, QueueCls: type = <class 'queue.Queue'>, reporter: ~utopya.reporter.WorkerManagerReporter | None = None, rf_spec: ~typing.Dict[str, str | ~typing.List[str]] | None = None, save_streams_on: ~typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: dict | None = None, cluster_mode: bool = False, resolved_cluster_params: dict | None = None)[source]#
Bases:
objectThe WorkerManager class orchestrates
WorkerTaskobjects: setting them up, invoking them, tracking their progress, and starting new workers if previous workers finished.The WorkerManager also keeps track of the status of tasks (as far as can be done without assuming too much about the tasks themselves). It then categorizes the tasks accordingly, using the following groups:
invoked: grabbed from the task queue, nothing else done yetspawned: setup function completed and process spawnedactive: currently assigned to workers and not finished yetsucceeded: finished successfully (exit status 0)stopped: stopped via a deliberately sent signal from the ‘outside’, e.g. a stop condition or a keyboard interrupt.skipped: skipped some time after invocation, e.g. as was determined by the setup function.failed: failed for some task-specific reason (exit code != 0)finished: union of success, stop, fail, and skip, i.e. all tasks that are no longer in the queue but also no longer active.
- __init__(num_workers: int | str = 'auto', poll_delay: float = 0.05, spawn_rate: int = -1, lines_per_poll: int = 50, periodic_task_callback: int | None = None, QueueCls: type = <class 'queue.Queue'>, reporter: ~utopya.reporter.WorkerManagerReporter | None = None, rf_spec: ~typing.Dict[str, str | ~typing.List[str]] | None = None, save_streams_on: ~typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: dict | None = None, cluster_mode: bool = False, resolved_cluster_params: dict | None = None)[source]#
Initialize the worker manager.
- Parameters:
num_workers (Union[int, str], optional) – The number of workers that can work in parallel. If ‘auto’ (default), uses os.cpu_count(). If below zero, deduces abs(num_workers) from the CPU count.
poll_delay (float, optional) – How long (in seconds) the delay between worker polls should be. For too small delays (<0.01), the CPU load will become significant.
spawn_rate (int, optional) – How many workers to spawn each working loop iteration. If -1, will assign new tasks to all free workers.
lines_per_poll (int, optional) – How many lines to read from each stream during polling of the tasks. This value should not be too large, otherwise the polling is delayed by too much. By setting it to -1, all available lines are read.
periodic_task_callback (int, optional) – If given, an additional task callback will be invoked after every
periodic_task_callbackpoll events.QueueCls (type, optional) – Which class to use for the queue. Defaults to the FiFo
queue.Queue.reporter (WorkerManagerReporter, optional) – The reporter associated with this WorkerManager, reporting on the progress.
rf_spec (Dict[str, Union[str, List[str]]], optional) –
The names of report formats that should be invoked at different points of the WorkerManager’s operation. Possible keys are:
before_working,while_working,after_work,after_cancel,after_fail,task_spawn,task_finished,task_skipped.All other keys are silently ignored.
The values of the dict can be either strings or lists of strings, where the strings always refer to report formats registered with the WorkerManagerReporter. This argument updates the default report format specifications.
save_streams_on (Sequence[str], optional) – On which events to invoke
save_streams()during work. Should be a sequence containing one or both of the keyson_monitor_update,periodic_callback.nonzero_exit_handling (str, optional) – How to react if a WorkerTask exits with a non-zero exit code. For ‘ignore’, nothing happens. For ‘warn’, a warning is printed and the last 5 lines of the log are shown. For ‘raise’, the last 20 lines of the log is shown, all other tasks are terminated, and the WorkerManager exits with the same exit code as the WorkerTask exited with. Note that ‘warn’ will not lead to any messages if the worker died by SIGTERM, which presumable originated from a fulfilled stop condition. Use ‘warn_all’ to also receive warnings in this case.
interrupt_params (dict, optional) –
Parameters that determine how the WorkerManager behaves when receiving KeyboardInterrupts during working. Possible keys:
send_signal: Which signal to send to the workers. Canbe SIGINT (default), SIGTERM, SIGKILL, or any valid signal as integer.
grace_period: how long to wait for the other workers togracefully shut down. After this period (in seconds), the workers will be killed via SIGKILL. Default is 5s.
exit: whether to sys.exit at the end of start_working.Default is True.
cluster_mode (bool, optional) – Whether similar tasks to those that are managed by this WorkerManager are, at the same time, worked on by other WorkerManager. This is relevant because the output of files might be affected by whether another WorkerManager instance is currently working on the same output directory. Also, in the future, this argument might be used to communicate between nodes.
resolved_cluster_params (dict, optional) – The corresponding cluster parameters.
- Raises:
ValueError – For too negative
num_workersargument
- pending_exceptions: Queue = None#
A (FiFo) queue of
Exceptionobjects that will be handled by theWorkerManagerduring working. This is the interface that allows for other threads that have access to the WorkerManager to add an exception and let it be handled in the main thread.
- rf_spec: dict = None#
The report format specifications that are used throughout the WorkerManager. These are invoked at different points of the operation:
before_workingwhile_workingafter_workafter_cancelafter_failtask_spawntask_finishedtask_skippedperiodic
- property task_count: int#
Returns the number of tasks that this manager ever took care of. Careful: This is NOT the current number of tasks in the queue!
- property active_tasks: List[WorkerTask]#
The list of currently active tasks.
Note that this information might not be up-to-date; a process might quit just after the list has been updated.
- property num_finished_tasks: int#
The number of finished tasks. Incremented whenever a task leaves the active_tasks list, regardless of its exit status.
- property stop_conditions: Set[StopCondition]#
All stop conditions that were ever passed to
start_working()during the life time of this WorkerManager.
- property nonzero_exit_handling: str#
Behavior upon a worker exiting with a non-zero exit code.
with
ignore, nothing happenswith
warn, a warning is printedwith
raise, the log is shown and the WorkerManager exits with the same exit code as the correspondingWorkerTaskexited with.
- property reporter: WorkerManagerReporter | None#
The associated
WorkerManagerReporteror None, if no reporter is set.
- property resolved_cluster_params: dict#
Returns a copy of the cluster configuration with all parameters resolved (thus making some additional keys available on the top level). It is returned as a deep copy to avoid mutability issues.
- add_task(*, TaskCls: type = <class 'utopya.task.WorkerTask'>, **task_kwargs) WorkerTask[source]#
Adds a task to the WorkerManager.
- Parameters:
TaskCls (type, optional) – The WorkerTask-like type to use
**task_kwargs – All arguments needed for WorkerTask initialization. See
utopya.task.WorkerTaskfor all valid arguments.
- Returns:
The created WorkerTask object
- Return type:
- start_working(*, shuffle_tasks: bool = False, timeout: float | None = None, stop_conditions: Sequence[StopCondition] | None = None, post_poll_func: Callable | None = None) None[source]#
Upon call, all enqueued tasks will be worked on sequentially.
- Parameters:
detach (bool, optional) – If False (default), the WorkerManager will block here, as it continuously polls the workers and distributes tasks.
timeout (float, optional) – If given, the number of seconds this work session is allowed to take. Workers will be canceled if the number is exceeded. Note that this is not measured in CPU time, but the host systems wall time. If zero, timeout occurs immediately. If None or negative, there will be no timeout.
stop_conditions (Sequence[StopCondition], optional) – During the run these StopCondition objects will be checked
post_poll_func (Callable, optional) – If given, this is called after all workers have been polled. It can be used to perform custom actions during a the polling loop.
- Raises:
ValueError – For invalid (i.e., negative) timeout value
WorkerManagerTotalTimeout – Upon a total timeout
- _invoke_report(rf_spec_name: str, *args, **kwargs)[source]#
Helper function to invoke the reporter’s report function
- _parse_timeout_args(*, timeout: float | None) float | None[source]#
Parses timeout-related arguments
- _parse_stop_conditions(stop_conditions: list | None) List[StopCondition] | None[source]#
Prepare stop conditions, creating the corresponding objects if needed.
- _grab_task() WorkerTask[source]#
Will initiate that a task is gotten from the queue and that it spawns its worker process.
- Returns:
The WorkerTask grabbed from the queue.
- Return type:
- Raises:
queue.Empty – If the task queue was empty
- _assign_tasks(num_free_workers: int) Tuple[int, int][source]#
Assigns tasks to (at most)
num_free_workersand returns the number of spawned and skipped tasks.The
spawn_rateof the WorkerManager will determine how many are actually spawned.Note
Skipped tasks do not count into the number of spawned tasks.
- _poll_workers() int[source]#
Will poll all workers that are in the active tasks list.
If they have finished, this will effectively invoke their callbacks, which will in turn remove them from the active tasks list.
- _check_stop_conds(stop_conds: Sequence[StopCondition]) Set[WorkerTask][source]#
Checks the given stop conditions for the active tasks and compiles a list of tasks that needs to be terminated.
- Parameters:
stop_conds (Sequence[StopCondition]) – The stop conditions that are to be checked.
- Returns:
- The WorkerTasks whose workers need to be
terminated
- Return type:
List[WorkerTask]
- _signal_workers(tasks: str | List[WorkerTask], *, signal: str | int) None[source]#
Send signals to a list of WorkerTasks.
- Parameters:
tasks (Union[str, List[WorkerTask]]) – strings ‘all’ or ‘active’ or a list of WorkerTasks to signal
- _handle_pending_exceptions() None[source]#
This method handles the list of pending exceptions during working, starting from the one added most recently.
As the WorkerManager occupies the main thread, it is difficult for other threads to signal to the WorkerManager that an exception occurred. The pending_exceptions attribute allows such a handling; child threads can just add an exception object to it and they get handled during working of the WorkerManager.
This method handles the following exception types in a specific manner:
WorkerTaskStopConditionFulfilled: never raising or loggingWorkerTaskNonZeroExit: raising or logging depending on the value of thenonzero_exit_handlingproperty
- Returns:
None
- Raises:
Exception – The exception that was added first to the queue of pending exceptions
utopya.yaml module#
Takes care of the YAML setup for Utopya.
In the module import order, this module needs to be downstream from all modules that implement objects that require a custom YAML representation.
- utopya.yaml._parameter_shorthand_constructor(loader, node) Parameter[source]#
Constructs a Parameter object from a scalar YAML node using
scalar_node_to_object().The YAML tag is used as shorthand
modeargument to thefrom_shorthandclass method.