utopya package#

The utopya package provides a simulation management and evaluation framework with the following components:

For a real-world example of how utopya can be integrated, have a look at the Utopia modelling framework which uses utopya as its frontend. For model implementations, the utopya_backend package can assist in building Python-based models that use utopya as a frontend.

Also visit the user manual front page for more information.

utopya.__version__ = '1.2.2'#

The utopya package version

Subpackages#

Submodules#

utopya._cluster module#

This module holds functions used in the Multiverse’s cluster mode

utopya._cluster.parse_node_list(node_list_str: str, *, mode: str, rcps: dict) List[str][source]#

Parses the node list to a list of node names and checks against the given resolved cluster parameters.

Depending on mode, different forms of the node list are parsable. For condensed mode:

node042
node[002,004-011,016]
m05s[0204,0402,0504]
m05s[0204,0402,0504],m08s[0504,0604,0701],m13s0603,m14s[0501-0502]

utopya._import_tools module#

Helper module that contains tools useful for module imports or manipulation of the system path. These are not implemented here but in dantro._import_tools.

Deprecated since version 1.0.1: This module is deprecated. Use dantro._import_tools or utopya_backend.tools.import_package_from_dir() instead.

utopya._signal module#

Implements signalling-related functionality and globally relevant data

utopya._signal.SIGMAP = {'SIGABRT': 6, 'SIGALRM': 14, 'SIGBUS': 7, 'SIGCHLD': 17, 'SIGCLD': 17, 'SIGCONT': 18, 'SIGFPE': 8, 'SIGHUP': 1, 'SIGILL': 4, 'SIGINT': 2, 'SIGIO': 29, 'SIGIOT': 6, 'SIGKILL': 9, 'SIGPIPE': 13, 'SIGPOLL': 29, 'SIGPROF': 27, 'SIGPWR': 30, 'SIGQUIT': 3, 'SIGRTMAX': 64, 'SIGRTMIN': 34, 'SIGSEGV': 11, 'SIGSTOP': 19, 'SIGSYS': 31, 'SIGTERM': 15, 'SIGTRAP': 5, 'SIGTSTP': 20, 'SIGTTIN': 21, 'SIGTTOU': 22, 'SIGURG': 23, 'SIGUSR1': 10, 'SIGUSR2': 12, 'SIGVTALRM': 26, 'SIGWINCH': 28, 'SIGXCPU': 24, 'SIGXFSZ': 25, 'SIG_BLOCK': 0, 'SIG_DFL': 0, 'SIG_IGN': 1, 'SIG_SETMASK': 2, 'SIG_UNBLOCK': 1}#

A map from signal names to corresponding integer exit codes

utopya._yaml module#

Supplies basic YAML interface, inherited from dantro and paramspace

utopya.batch module#

Implements batch running and evaluation of simulations

utopya.batch._BTM_BASE_CFG_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/latest/utopya/cfg/btm_cfg.yml'#

Base configuration path of the batch task manager

utopya.batch._BTM_BASE_CFG = {'cluster_mode': False, 'debug': False, 'parallelization_level': 'batch', 'paths': {'note': None, 'out_dir': '~/utopya_output/_batch'}, 'reporter': {'report_formats': {'progress_bar': {'info_fstr': '{total_progress:>5.1f}% ({cnt[finished]} / {cnt[total]})', 'min_report_intv': 0.5, 'num_cols': 'adaptive', 'parser': 'progress_bar', 'show_times': True, 'times_fstr': '| {elapsed} elapsed', 'times_fstr_final': '| finished in {elapsed:} ', 'times_kwargs': {'mode': 'from_start'}, 'write_to': 'stdout_noreturn'}, 'report_file': {'min_num': 4, 'min_report_intv': 10, 'parser': 'report', 'show_individual_runtimes': True, 'task_label_plural': 'tasks', 'task_label_singular': 'task', 'write_to': {'file': {'path': '_report.txt'}}}}}, 'run_kwargs': {'timeout': None}, 'task_defaults': {'eval': {'create_symlinks': True, 'data_manager': {'out_dir_kwargs': {'exist_ok': False}}, 'out_dir': '{task_name:}', 'plot_manager': {'raise_exc': True}, 'priority': None}, 'run': {}}, 'tasks': {'eval': {}, 'run': {}}, 'worker_kwargs': {'forward_raw': True, 'forward_streams': False, 'popen_kwargs': {}, 'remove_ansi': True, 'save_raw': True, 'save_streams': True, 'streams_log_lvl': None}, 'worker_manager': {'interrupt_params': {'exit': False, 'grace_period': 5.0, 'send_signal': 'SIGTERM'}, 'lines_per_poll': 20, 'nonzero_exit_handling': 'warn_all', 'num_workers': 'auto', 'periodic_task_callback': 20, 'poll_delay': 0.05, 'rf_spec': {'after_abort': ['progress_bar', 'report_file'], 'after_work': ['progress_bar', 'report_file'], 'before_working': [], 'monitor_updated': ['progress_bar'], 'task_finished': ['progress_bar', 'report_file'], 'task_spawned': ['progress_bar'], 'while_working': ['progress_bar']}, 'save_streams_on': ['periodic_callback']}}#

Actual base configuration of the batch task manager

utopya.batch._BTM_USER_DEFAULTS = {}#

User defaults for the batch task manager

utopya.batch._BTM_DEFAULTS = {'cluster_mode': False, 'debug': False, 'parallelization_level': 'batch', 'paths': {'note': None, 'out_dir': '~/utopya_output/_batch'}, 'reporter': {'report_formats': {'progress_bar': {'info_fstr': '{total_progress:>5.1f}% ({cnt[finished]} / {cnt[total]})', 'min_report_intv': 0.5, 'num_cols': 'adaptive', 'parser': 'progress_bar', 'show_times': True, 'times_fstr': '| {elapsed} elapsed', 'times_fstr_final': '| finished in {elapsed:} ', 'times_kwargs': {'mode': 'from_start'}, 'write_to': 'stdout_noreturn'}, 'report_file': {'min_num': 4, 'min_report_intv': 10, 'parser': 'report', 'show_individual_runtimes': True, 'task_label_plural': 'tasks', 'task_label_singular': 'task', 'write_to': {'file': {'path': '_report.txt'}}}}}, 'run_kwargs': {'timeout': None}, 'task_defaults': {'eval': {'create_symlinks': True, 'data_manager': {'out_dir_kwargs': {'exist_ok': False}}, 'out_dir': '{task_name:}', 'plot_manager': {'raise_exc': True}, 'priority': None}, 'run': {}}, 'tasks': {'eval': {}, 'run': {}}, 'worker_kwargs': {'forward_raw': True, 'forward_streams': False, 'popen_kwargs': {}, 'remove_ansi': True, 'save_raw': True, 'save_streams': True, 'streams_log_lvl': None}, 'worker_manager': {'interrupt_params': {'exit': False, 'grace_period': 5.0, 'send_signal': 'SIGTERM'}, 'lines_per_poll': 20, 'nonzero_exit_handling': 'warn_all', 'num_workers': 'auto', 'periodic_task_callback': 20, 'poll_delay': 0.05, 'rf_spec': {'after_abort': ['progress_bar', 'report_file'], 'after_work': ['progress_bar', 'report_file'], 'before_working': [], 'monitor_updated': ['progress_bar'], 'task_finished': ['progress_bar', 'report_file'], 'task_spawned': ['progress_bar'], 'while_working': ['progress_bar']}, 'save_streams_on': ['periodic_callback']}}#

Aggregated and recursively updated default batch task manager config

utopya.batch.INVALID_TASK_NAME_CHARS = ('/', ':', '.', '?', '*')#

Substrings that may not appear in task names

utopya.batch._eval_task(*, task_name: str, _batch_name: str, _batch_dirs: dict, _task_cfg_path: str, _create_symlinks: bool, model_name: str, model_kwargs: dict = {}, use_data_tree_cache: Optional[bool] = None, print_tree: Union[bool, str] = 'condensed', plot_only: Optional[Sequence[str]] = None, plots_cfg: Optional[str] = None, update_plots_cfg: dict = {}, **frozen_mv_kwargs)[source]#

The evaluation task target for the multiprocessing.Process. It sets up a utopya.model.Model, loads the data, and performs plots.

class utopya.batch.BatchTaskManager(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]#

Bases: object

A manager for batch tasks

RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#

The time format string for the run directory

__init__(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]#

Sets up a BatchTaskManager.

Parameters
  • batch_cfg_path (str, optional) – The batch file with all the task definitions.

  • **update_batch_cfg – Additional arguments that are used to update the batch configuration.

Raises

NotImplementedError – If run_tasks or cluster_mode were set in the batch configuration.

property debug: bool#

Whether debug mode was enabled.

property parallelization_level: str#
property run_defaults: dict#

A deepcopy of the run task defaults

property eval_defaults: dict#

A deepcopy of the eval task defaults

property dirs: dict#

The directories associated with this BatchTaskManager

perform_tasks()[source]#

Perform all run and eval tasks.

static _setup_batch_cfg(batch_cfg_path: str, **update_batch_cfg) dict[source]#

Sets up the BatchTaskManager configuration

_setup_dirs(out_dir: str, note: Optional[str] = None) Tuple[Dict[str, str], str][source]#

Sets up directories

_perform_backup(**parts)[source]#

Stores the given configuration parts in the config directory

_add_tasks(tasks: dict, defaults: dict, add_task: Callable) int[source]#

Adds all configured run tasks to the WorkerManager’s task queue

_add_run_task(name: str, **_)[source]#

Adds a single run task to the WorkerManager

_add_eval_task(name: str, *, model_name: str, out_dir: str, enabled: bool = True, priority: Optional[int] = None, create_symlinks: bool = False, **eval_task_kwargs)[source]#

Adds a single evaluation task to the WorkerManager.

Parameters
  • name (str) – Name of this task

  • model_name (str) – Model name; required in task, thus already requiring it here.

  • out_dir (str) – The path to the data output directory, i.e. the directory where all plots will ned up in. This may be a format string containing any of the following keys: task_name, model_name, timestamp, batch_name (combination of timestamp and the note). Relative paths are evaluated relative to the eval batch run directory.

  • enabled (bool, optional) – If False, will not add this task.

  • priority (int, optional) – Task priority; tasks with smaller value will be picked first.

  • create_symlinks (bool, optional) – Whether to create symlinks that add crosslinks between related directories, e.g.: from the output directory, link back to the task configuration; from the evaluation output directory alongside the simulation data, link to the batch output directory

  • **eval_task_kwargs – All further evaluation task arguments.

utopya.cfg module#

Module that coordinates utopya’s persistent config directory

utopya.cfg.UTOPYA_CFG_DIR = '/home/docs/.config/utopya'#

Path to the persistent utopya configuration directory

utopya.cfg.UTOPYA_CFG_FILE_NAMES = {'batch': 'batch_cfg.yml', 'user': 'user_cfg.yml', 'utopya': 'utopya_cfg.yml'}#

Names and paths of valid configuration entries

utopya.cfg.UTOPYA_CFG_FILE_PATHS = {'batch': '/home/docs/.config/utopya/batch_cfg.yml', 'user': '/home/docs/.config/utopya/user_cfg.yml', 'utopya': '/home/docs/.config/utopya/utopya_cfg.yml'}#

Absolute configuration file paths

utopya.cfg.UTOPYA_CFG_SUBDIR_NAMES = {'models': 'models', 'projects': 'projects'}#

Names and paths of valid configuration subdirectories

utopya.cfg.UTOPYA_CFG_SUBDIRS = {'models': '/home/docs/.config/utopya/models', 'projects': '/home/docs/.config/utopya/projects'}#

Absolute configuration file paths

utopya.cfg.PROJECT_INFO_FILE_SEARCH_PATHS = ('.utopya_project.yml', '.utopya-project.yml')#

Potential names of project info files, relative to base directory

utopya.cfg.get_cfg_path(cfg_name: str) str[source]#

Returns the absolute path to the specified configuration file

utopya.cfg.load_from_cfg_dir(cfg_name: str) dict[source]#

Load a configuration file; returns empty dict if no file exists.

Parameters

cfg_name (str) – The name of the configuration to read

Returns

The configuration as read from the config directory; if no file

is available, will return an empty dict.

Return type

dict

utopya.cfg.write_to_cfg_dir(cfg_name: str, obj: dict)[source]#

Writes a YAML represetation of the given object to the configuration directory. Always overwrites a possibly existing file.

Parameters
  • cfg_name (str) – The configuration name

  • obj (dict) – The yaml-representable object that is to be written; usually a dict.

utopya.exceptions module#

utopya-specific exception types

exception utopya.exceptions.UtopyaException[source]#

Bases: BaseException

Base class for utopya-specific exceptions

exception utopya.exceptions.ValidationError[source]#

Bases: utopya.exceptions.UtopyaException, ValueError

Raised upon failure to validate a parameter

exception utopya.exceptions.WorkerManagerError[source]#

Bases: utopya.exceptions.UtopyaException

The base exception class for WorkerManager errors

exception utopya.exceptions.WorkerManagerTotalTimeout[source]#

Bases: utopya.exceptions.WorkerManagerError

Raised when a total timeout occurred

exception utopya.exceptions.WorkerTaskError[source]#

Bases: utopya.exceptions.WorkerManagerError

Raised when there was an error in a WorkerTask

exception utopya.exceptions.WorkerTaskNonZeroExit(task: utopya.task.WorkerTask, *args, **kwargs)[source]#

Bases: utopya.exceptions.WorkerTaskError

Can be raised when a WorkerTask exited with a non-zero exit code.

__init__(task: utopya.task.WorkerTask, *args, **kwargs)[source]#

Initialize an error handling non-zero exit codes from workers

__str__() str[source]#

Returns information on the error

exception utopya.exceptions.WorkerTaskStopConditionFulfilled(task: utopya.task.WorkerTask, *args, **kwargs)[source]#

Bases: utopya.exceptions.WorkerTaskNonZeroExit

An exception that is raised when a worker-specific stop condition was fulfilled. This allows being handled separately to other non-zero exits.

exception utopya.exceptions.YAMLRegistryError[source]#

Bases: utopya.exceptions.UtopyaException, ValueError

Base class for errors in YAMLRegistry

exception utopya.exceptions.EntryExistsError[source]#

Bases: utopya.exceptions.YAMLRegistryError

Raised if an entry already exists

exception utopya.exceptions.MissingEntryError[source]#

Bases: utopya.exceptions.YAMLRegistryError

Raised if an entry is missing

exception utopya.exceptions.MissingRegistryError[source]#

Bases: utopya.exceptions.YAMLRegistryError

Raised if a registry is missing

exception utopya.exceptions.EntryValidationError[source]#

Bases: utopya.exceptions.YAMLRegistryError

Raised upon failed validation of a registry entry

exception utopya.exceptions.SchemaValidationError[source]#

Bases: utopya.exceptions.YAMLRegistryError

If schema validation failed

exception utopya.exceptions.ModelRegistryError[source]#

Bases: utopya.exceptions.UtopyaException, ValueError

Raised on errors with model registry

exception utopya.exceptions.MissingModelError[source]#

Bases: utopya.exceptions.ModelRegistryError

Raised when a model is missing

exception utopya.exceptions.BundleExistsError[source]#

Bases: utopya.exceptions.ModelRegistryError

Raised when a bundle that compared equal already exists

exception utopya.exceptions.MissingBundleError[source]#

Bases: utopya.exceptions.ModelRegistryError

Raised when a bundle is missing

exception utopya.exceptions.BundleValidationError[source]#

Bases: utopya.exceptions.ModelRegistryError

Raised when the result of validating the existence of a bundle fails

exception utopya.exceptions.ProjectRegistryError[source]#

Bases: utopya.exceptions.UtopyaException, ValueError

Raised on errors with project registry

exception utopya.exceptions.MissingProjectError[source]#

Bases: utopya.exceptions.ProjectRegistryError

Raised on a missing project

exception utopya.exceptions.ProjectExistsError[source]#

Bases: utopya.exceptions.ProjectRegistryError

Raised if a project or project file of that name already exists

utopya.model module#

Provides the Model to work interactively with registered utopya models

class utopya.model.Model(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, bundle_label: Optional[str] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]#

Bases: object

A class to work with Utopia models interactively.

It attaches to a certain model and makes it easy to load config files, create a Multiverse from them, run it, and work with it further…

CONFIG_SET_MODEL_SOURCE_SUBDIRS = ('cfgs', 'cfg_sets', 'config_sets')#

Directories within the model source directories to search through when looking for configuration sets. These are not used if the utopya config contains an entry overwriting this.

__init__(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, bundle_label: Optional[str] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]#

Initialize the ModelTest for the given model name

Parameters
  • name (str, optional) – Name of the model to attach to. If not given, need to pass info_bundle.

  • info_bundle (ModelInfoBundle, optional) – The required information to work with this model. If not given, will attempt to find the model in the model registry via name or bundle_label.

  • bundle_label (str, optional) – A label to use for identifying the info bundle.

  • base_dir (str, optional) – For convenience, can specify this path which will be seen as the base path for config files; if set, arguments that allow specifying configuration files can specify them relative to this directory.

  • sim_errors (str, optional) – Whether to raise errors from Multiverse

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.

Raises

ValueError – Upon bad base_dir

__str__() str[source]#

Returns an informative string for this Model instance

property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle#

The model info bundle

property name: str#

The name of this Model object, which is at the same time the name of the attached model.

property base_dir: str#

Returns the path to the base directory, if set during init.

This is the path to a directory from which config files can be loaded using relative paths.

property default_model_cfg: dict#

Returns the default model configuration by loading it from the file specified in the info bundle.

property default_config_set_search_dirs: List[str]#

Returns the default config set search directories for this model in the order of precedence:

  • defined on the project-level via cfg_set_abs_search_dirs; these may also be format strings supporting the following set of keys: model_name, project_base_dir, and model_source_dir (if set). If no project is associated, there will be no additional search directories.

  • names of subdirectories relative to the model source directory, defined in cfg_set_model_source_subdirs. If no model source directory is known, no search directories will be added. If no project is associated, a standard set of search directories is used: cfgs, cfg_sets, config_sets, as defined in CONFIG_SET_MODEL_SOURCE_SUBDIRS.

Note

The output may contain relative paths.

property default_config_sets: Dict[str, dict]#

Config sets at the default search locations.

To retrieve an individual config set, consider using get_config_set() instead of this property.

For more information, see Configuration Sets.

create_mv(*, from_cfg: Optional[str] = None, from_cfg_set: Optional[str] = None, run_cfg_path: Optional[str] = None, use_tmpdir: Optional[bool] = None, **update_meta_cfg) utopya.multiverse.Multiverse[source]#

Creates a utopya.multiverse.Multiverse for this model, optionally loading a configuration from a file and updating it with further keys.

Parameters
  • from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.

  • from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with from_cfg and run_cfg_path.

  • run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if from_cfg or from_cfg_set arguments were given.

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.

  • **update_meta_cfg – Can be used to update the meta configuration

Returns

The created Multiverse object

Return type

Multiverse

Raises

ValueError – If more than one of the run config selecting arguments (from_cfg, from_cfg_set, run_cfg_path) were given.

create_frozen_mv(**fmv_kwargs) utopya.multiverse.FrozenMultiverse[source]#

Create a utopya.multiverse.FrozenMultiverse, coupling it to a run directory.

Use this method if you want to load an existing simulation run.

Parameters

**fmv_kwargs – Passed on to utopya.multiverse.FrozenMultiverse.__init__()

create_run_load(*, from_cfg: Optional[str] = None, run_cfg_path: Optional[str] = None, from_cfg_set: Optional[str] = None, use_tmpdir: Optional[bool] = None, print_tree: bool = True, **update_meta_cfg) Tuple[utopya.multiverse.Multiverse, utopya.eval.datamanager.DataManager][source]#

Chains the create_mv(), mv.run, and mv.dm.load_from_cfg methods calls together and returns a (Multiverse, DataManager) tuple.

Parameters
  • from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.

  • from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with from_cfg and run_cfg_path.

  • run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if from_cfg or from_cfg_set arguments were given.

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.

  • print_tree (bool, optional) – Whether to print the loaded data tree

  • **update_meta_cfg – Arguments passed to the create_mv function

Returns

The created Multiverse and the

corresponding DataManager (with data already loaded).

Return type

Tuple[Multiverse, DataManager]

get_config_set(name: Optional[str] = None) Dict[str, str][source]#

Returns a configuration set: a dict containing paths to run and/or eval configuration files. These are accessible via the keys run and eval.

Config sets are retrieved from multiple locations:

  • The cfgs directory in the model’s source directory

  • The user-specified lookup directories, specified in the utopya configuration as config_set_search_dirs

  • If name is an absolute or relative path, and a directory exists at the specified location, the parent directory is interpreted as a search path.

This uses get_config_sets() to retrieve all available configuration sets from the above paths and then selects the one with the given name. Config sets that are found later overwrite those with the same name found in previous searches and log a warning message (which can be controlled with the warn argument); sets are not merged.

For more information, see Configuration Sets.

Parameters

name (str, optional) – The name of the config set to retrieve. This may also be a local path, which is looked up prior to the default search directories.

get_config_sets(*, search_dirs: Optional[List[str]] = None, warn: bool = True, cfg_sets: Optional[dict] = None) Dict[str, dict][source]#

Searches for all available configuration sets in the given search directories, aggregating them into one dict.

The search is done in reverse order of the paths given in search_dirs, i.e. starting from those directories with the lowest precedence. If configuration sets with the same name are encountered, warnings are emitted, but the one with higher precedence (appearing more towards the front of search_dirs, i.e. the later-searched one) will take precedence.

Note

This will not merge configuration sets from different search directories, e.g. if one contained only an eval configuration and the other contained only a run configuration, a warning will be emitted but the entry from the later-searched directory will be used.

Parameters
  • search_dirs (List[str], optional) – The directories to search sequentially for config sets. If not given, will use the default config set search directories, see default_config_set_search_dirs.

  • warn (bool, optional) – Whether to warn (via log message), if the search yields a config set with a name that already existed.

  • cfg_sets (dict, optional) – If given, aggregate newly found config sets into this dict. Otherwise, start with an empty one.

_store_mv(mv: utopya.multiverse.Multiverse, **kwargs) None[source]#

Stores a created Multiverse object and all the kwargs in a dict

_create_tmpdir() tempfile.TemporaryDirectory[source]#

Create a TemporaryDirectory

_find_config_sets(search_dir: str, *, cfg_sets: dict, warn: bool = True) Dict[str, dict][source]#

Looks for config sets in the given directory and aggregates them into the given cfg_sets dict, warning if an entry already exists.

Parameters
  • search_dir (str) – The directory to search for configuration sets. Can be an absolute or relative path; ~ is expanded.

  • cfg_sets (dict) – The dict to populate with the results, each entry being one config set.

  • warn (bool, optional) – Whether to warn (via log message) if an entry already exists.

utopya.multiverse module#

Implementation of the Multiverse class which sits at the heart of utopya and supplies the main user interface for the frontend. It allows to run a simulation and then evaluate it.

class utopya.multiverse.Multiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]#

Bases: object

The Multiverse is where a single simulation run is orchestrated from.

It spawns multiple universes, each of which represents a single simulation of the selected model with the parameters specified by the meta configuration. The WorkerManager takes care to perform these simulations in parallel.

The Multiverse then interfaces with the dantro data processing pipeline using classes specialized in utopya.eval: The DataManager loads the created simulation output, making it available in a uniformly accessible and hierarchical data tree. Then, the PlotManager handles plotting of that data.

BASE_META_CFG_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/latest/utopya/cfg/base_cfg.yml'#

Where the default meta configuration can be found

USER_CFG_SEARCH_PATH = '/home/docs/.config/utopya/user_cfg.yml'#

Where to look for the user configuration

RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#

The time format string for the run directory

UTOPYA_BASE_PLOTS_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/latest/utopya/cfg/base_plots.yml'#

Where the utopya base plots configuration can be found; this is passed to the PlotManager.

__init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]#

Initialize the Multiverse.

Parameters
  • model_name (str, optional) – The name of the model to run

  • info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the executable path etc. If not given, will attempt to read it from the model registry.

  • run_cfg_path (str, optional) – The path to the run configuration.

  • user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.

  • _shared_worker_manager (WorkerManager, optional) –

    If given, this already existing WorkerManager instance (and its reporter) will be used instead of initializing new instances.

    Warning

    This argument is only exposed for internal purposes. It should not be used for production code and behavior of this argument may change at any time.

  • **update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels

property debug_level: int#

The debug level

property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle#

The model info bundle for this Multiverse

property model_name: str#

The model name associated with this Multiverse

property model_executable: str#

The path to the model executable

property model: utopya.model.Model#

A model instance, created ad-hoc using the associated info bundle

property meta_cfg: dict#

The meta configuration.

property dirs: dict#

Information on managed directories.

property cluster_mode: bool#

Whether the Multiverse should run in cluster mode

property cluster_params: dict#

Returns a copy of the cluster mode configuration parameters

property resolved_cluster_params: dict#

Returns a copy of the cluster configuration with all parameters resolved. This makes some additional keys available on the top level.

property dm: utopya.eval.datamanager.DataManager#

The Multiverse’s DataManager.

property wm: utopya.workermanager.WorkerManager#

The Multiverse’s WorkerManager.

property pm: utopya.eval.plotmanager.PlotManager#

The Multiverse’s PlotManager.

run(*, sweep: Optional[bool] = None)[source]#

Starts a Utopia simulation run.

Specifically, this method adds simulation tasks to the associated WorkerManager, locks its task list, and then invokes the start_working() method which performs all the simulation tasks.

If cluster mode is enabled, this will split up the parameter space into (ideally) equally sized parts and only run one of these parts, depending on the cluster node this Multiverse is being invoked on.

Note

As this method locks the task list of the WorkerManager, no further tasks can be added henceforth. This means, that each Multiverse instance can only perform a single simulation run.

Parameters

sweep (bool, optional) – Whether to perform a sweep or not. If None, the value will be read from the perform_sweep key of the meta-configuration.

run_single()[source]#

Runs a single simulation using the parameter space’s default value.

See run() for more information.

run_sweep()[source]#

Runs a parameter sweep.

See run() for more information.

renew_plot_manager(**update_kwargs)[source]#

Tries to set up a new PlotManager. If this succeeds, the old one is discarded and the new one is associated with this Multiverse.

Parameters

**update_kwargs – Passed on to PlotManager.__init__

_create_meta_cfg(*, run_cfg_path: str, user_cfg_path: str, update_meta_cfg: dict) dict[source]#

Create the meta configuration from several parts and store it.

The final configuration dict is built from multiple components, where one is always recursively updating the previous level. The resulting configuration is the meta configuration and is stored to the meta_cfg attribute.

The parts are recorded in the cfg_parts dict and returned such that a backup can be created.

Parameters
  • run_cfg_path (str) – path to the run configuration

  • user_cfg_path (str) – path to the user configuration file

  • update_meta_cfg (dict) – will be used to update the resulting dict

Returns

dict of the parts that were needed to create the meta config.

The dict-key corresponds to the part name, the value is the payload which can be either a path to a cfg file or a dict

Return type

dict

_apply_debug_level(lvl: Optional[int] = None)[source]#

Depending on the debug level, applies certain settings to the Multiverse and the runtime environment.

Note

This does not (yet) set the corresponding debug flags for the PlotManager, DataManager, or WorkerManager!

_create_run_dir(*, out_dir: str, model_note: Optional[str] = None) None[source]#

Create the folder structure for the run output.

For the chosen model name and current timestamp, the run directory will be of form <timestamp>_<model_note> and be part of the following directory tree:

utopya_output
    model_a
        180301-125410_my_model_note
            config
            data
                uni000
                uni001
                ...
            eval
    model_b
        180301-125412_my_first_sim
        180301-125413_my_second_sim

If running in cluster mode, the cluster parameters are resolved and used to determine the name of the simulation. The pattern then does not include a timestamp as each node might return not quite the same value. Instead, a value from an environment variable is used. The resulting path can have different forms, depending on which environment variables were present; required parts are denoted by a * in the following pattern; if the value of the other entries is not available, the connecting underscore will not be used:

{timestamp}_{job id*}_{cluster}_{job account}_{job name}_{note}
Parameters
  • out_dir (str) – The base output directory, where all Utopia output is stored.

  • model_note (str, optional) – The note to add to the run directory of the current run.

Raises

RuntimeError – If the simulation directory already existed. This should not occur, as the timestamp is unique. If it occurs, you either started two simulations very close to each other or something is seriously wrong. Strange time zone perhaps?

_setup_pm(**update_kwargs) utopya.eval.plotmanager.PlotManager[source]#

Helper function to setup a PlotManager instance

_parse_base_cfg_pools(base_cfg_pools: List[Union[str, Tuple[str, Union[dict, str]]]]) List[Tuple[str, Union[dict, str]]][source]#

Prepares the base_cfg_pools argument to be valid input to the PlotManager. This method resolves format strings and thus allows to more generically define base config pools.

Possible formats for each entry of base_cfg_pools argument are:

  • A 2-tuple (name, pool dict) which specifies the name of the base config pool alongside with the pool entries.

  • A 2-tuple (name, path to pool config file), which is later loaded by the PlotManager

  • A shortcut key which resolves to the corresponding 2-tuple. Available shortcuts are: utopya_base, framework_base, project_base, and model_base.

Both the pool name and path may be format strings which get resolved with the model_name key and (in the case of the path) the full paths dict of the current model’s info bundle. A format string may look like this:

“{paths[source_dir]}/{model_name}_more_plots.yml” “~/some/more/plots/{model_name}/plots.yml”

If such a path cannot be resolved, an error is logged and an empty pool is used instead; this allows for more flexibility in defining locations for additional config pools.

Parameters

base_cfg_pools (List[Union[str, Tuple[str, Union[str, dict]]]]) – The unparsed specification of config pools.

_perform_backup(*, cfg_parts: dict, backup_cfg_files: bool = True, backup_executable: bool = False, include_git_info: bool = True) None[source]#

Performs a backup of that information that can be used to recreate a simulation.

The configuration files are backed up into the config subdirectory of the run directory. All other relevant information is stored in an additionally created backup subdirectory.

Warning

These backups are created prior to the start of the actual simulation run and contains information known at that point. Any changes to the meta configuration made after initialization of the Multiverse will not be reflected in these backups.

In particular, the perform_sweep and parameter_space entries of the meta configuration may not reflect which form of parameter space iteration was actually performed, because the run_single and run_sweep methods overwrite this behavior. To that end, that information is separately stored once the run methods are invoked.

Parameters
  • cfg_parts (dict) – A dict of either paths to configuration files or dict-like data that is to be dumped into a configuration file.

  • backup_cfg_files (bool, optional) – Whether to backup the individual configuration files (i.e. the cfg_parts information). If false, the meta configuration will still be backed up.

  • backup_executable (bool, optional) – Whether to backup the executable. Note that these files can sometimes be quite large.

  • include_git_info (bool, optional) – If True, will store information about the state of the project’s (and framework’s, if existent) git repository.

_perform_pspace_backup(pspace: paramspace.paramspace.ParamSpace, *, filename: str = 'parameter_space', **info_kwargs)[source]#

Stores the given parameter space and its metadata into the config directory. Two files will be produced:

  • config/{filename}.yml: the passed pspace object

  • config/{filename}_info.yml: the passed pspace object’s

    info dictionary containing relevant metadata (and the additionally passed info_kwargs)

Note

This method is separated from the regular backup method _perform_backup() because the parameter space that is used during a simulation run may be a lower-dimensional version of the one the Multiverse was initialized with. To that end, run() will invoke this backup function again once the relevant information is fully determined. This is important because it is needed to communicate the correct information about the sweep to objects downstream in the pipeline (e.g. MultiversePlotCreator).

Parameters
  • pspace (paramspace.paramspace.ParamSpace) – The ParamSpace object to save as backup.

  • filename (str, optional) – The filename (without extension!) to use. (This is also used for the log message, with underscores changed to spaces.)

  • **info_kwargs – Additional kwargs that are to be stored in the meta- data dict.

_prepare_executable(*, run_from_tmpdir: bool = False) None[source]#

Prepares the model executable, potentially copying it to a temporary location.

Note that run_from_tmpdir requires the executable to be relocatable to another location, i.e. be position-independent.

Parameters

run_from_tmpdir (bool, optional) – Whether to copy the executable to a temporary directory that goes out of scope once the Multiverse instance goes out of scope.

Raises
_resolve_cluster_params() dict[source]#

This resolves the cluster parameters, e.g. by setting parameters depending on certain environment variables. This function is called by the resolved_cluster_params property.

Returns

The resolved cluster configuration parameters

Return type

dict

Raises

ValueError – If a required environment variable was missing or empty

_add_sim_task(*, uni_id_str: str, uni_cfg: dict, is_sweep: bool) None[source]#

Helper function that handles task assignment to the WorkerManager.

This function creates a WorkerTask that will perform the following actions once it is grabbed and worked at:

  • Create a universe (folder) for the task (simulation)

  • Write that universe’s configuration to a yaml file in that folder

  • Create the correct arguments for the call to the model binary

To that end, this method gathers all necessary arguments and registers a WorkerTask with the WorkerManager.

Parameters
  • uni_id_str (str) – The zero-padded uni id string

  • uni_cfg (dict) – given by ParamSpace. Defines how many simulations should be started

  • is_sweep (bool) – Flag is needed to distinguish between sweeps and single simulations. With this information, the forwarding of a simulation’s output stream can be controlled.

Raises

RuntimeError – If adding the simulation task failed

_add_sim_tasks(*, sweep: Optional[bool] = None) int[source]#

Adds the simulation tasks needed for a single run or for a sweep.

Parameters

sweep (bool, optional) – Whether tasks for a parameter sweep should be added or only for a single universe. If None, will read the perform_sweep key from the meta-configuration.

Returns

The number of added tasks.

Return type

int

Raises

ValueError – On sweep == True and zero-volume parameter space.

_validate_meta_cfg() bool[source]#

Goes through the parameters that require validation, validates them, and creates a useful error message if there were invalid parameters.

Returns

True if all parameters are valid; None if no check was done.

Note that False will never be returned, but a ValidationError will be raised instead.

Return type

bool

Raises

ValidationError – If validation failed.

class utopya.multiverse.FrozenMultiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#

Bases: utopya.multiverse.Multiverse

A frozen Multiverse is like a Multiverse, but frozen.

It is initialized from a finished Multiverse run and re-creates all the attributes from that data, e.g.: the meta configuration, a DataManager, and a PlotManager.

Note

A frozen multiverse is no longer able to perform any simulations.

__init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#

Initializes the FrozenMultiverse from a model name and the name of a run directory.

Note that this also takes arguments to specify the run configuration to use.

Parameters
  • model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.

  • info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.

  • run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.

  • run_cfg_path (str, optional) – The path to the run configuration.

  • user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.

  • use_meta_cfg_from_run_dir (bool, optional) – If True, will load the meta configuration from the given run directory; only works for absolute run directories.

  • **update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels

_create_run_dir(*, out_dir: str, run_dir: str, **__)[source]#

Helper function to find the run directory from arguments given to __init__().

Overwrites the method from the parent Multiverse class, because the FrozenMultiverse does not require setting up a new run directory but should instead identify the existing one and create an appropriate output directory.

Parameters
  • out_dir (str) – The output directory

  • run_dir (str) – The run directory to use

  • **__ – ignored

Raises
  • IOError – No directory found to use as run directory

  • TypeError – When run_dir was not a string

utopya.parameter module#

This module implements the Parameter class which is used when validating model and simulation parameters.

class utopya.parameter.Parameter(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Optional[float], Optional[float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]#

Bases: object

The parameter class is used when a model parameter needs to be validated before commencing the model run. It can hold information on the parameter itself as well as its valid range and type and other meta-data.

Per default, the Parameter class should be assumed to handle scalar parameters like numerical values or strings. For validating sequence-like parameters, corresponding specializing classes are to be implemeted.

SHORTHAND_MODES: Dict[str, Callable] = {'is-bool': <function Parameter.<lambda>>, 'is-in-unit-interval': <function Parameter.<lambda>>, 'is-int': <function Parameter.<lambda>>, 'is-negative': <function Parameter.<lambda>>, 'is-negative-int': <function Parameter.<lambda>>, 'is-negative-or-zero': <function Parameter.<lambda>>, 'is-positive': <function Parameter.<lambda>>, 'is-positive-int': <function Parameter.<lambda>>, 'is-positive-or-zero': <function Parameter.<lambda>>, 'is-probability': <function Parameter.<lambda>>, 'is-string': <function Parameter.<lambda>>, 'is-unsigned': <function Parameter.<lambda>>}#

Shorthand mode factory functions. These are used in the from_shorthand() class method to generate a Parameter object more easily.

Also, utopya.yaml registers each of these shorthand modes as a YAML constructor for tag !<mode>.

LIMIT_COMPS: Dict[str, Callable] = {'(': <built-in function gt>, ')': <built-in function lt>, '[': <built-in function ge>, ']': <built-in function le>}#

Comparators for the limits check, depending on limits_mode

LIMIT_MODES: Sequence[str] = ('[]', '()', '[)', '(]')#

Possible limit modes

yaml_tag: str = '!param'#

Default YAML tag to use for representing

__init__(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Optional[float], Optional[float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]#

Creates a new Parameter object, which holds a default value as well as some constraints on the possible values this parameter can assume.

Parameters
  • default (Any) – the default value of the parameter.

  • name (str, optional) – the name of the parameter.

  • description (str, optional) – a description of this parameter or its effects.

  • is_any_of (Sequence[Any], optional) – a sequence of possible values this parameter can assume. If this parameter is given, limits cannot be used.

  • limits (Tuple[Optional[float], Optional[float]], optional) – the upper and lower bounds of the parameter (only applicable to scalar numerals). If None, the bound is assumed to be negative or positive infinity, respectively. Whether boundary values are included into the interval is controlled by the limits_mode argument. This argument is mutually exclusive with is_any_of!

  • limits_mode (str, optional) – whether to interpret the limits as an open, closed, or semi-closed interval. Possible values: '[]' (closed, default), '()' (open), '[)', and '(]'.

  • dtype (Union[str, type], optional) – expected data type of this parameter. Accepts all strings that are accepted by numpy.dtype, eg. int, float, uint16, string.

Raises
  • TypeError – On a limits argument that was not tuple-like or if a limits argument was given but the default was a

  • ValueError – if an invalid limits_mode is passed, if limits and is_any_of are both passed, or if the limits argument did not have length 2.

__eq__(other) bool[source]#

Returns True for parameters with equal behavior.

property default#

The default value for this parameter

property name#

The name of this parameter

property description#

The description of this parameter

property limits: Optional[tuple]#

The limits of this parameter

property limits_mode: str#

The mode used when evaluating the limits

property is_any_of: Tuple[Any]#

Possible values this parameter may assume

property dtype: Optional[numpy.dtype]#

The expected data type of this parameter

validate(value: Any, *, raise_exc: bool = True) bool[source]#

Checks whether the given value would be a valid parameter.

The checks for the corresponding arguments are carried out in the following order:

  1. is_any_of

  2. dtype

  3. limits

The data type is checked according to the numpy type hierarchy, see docs. To reduce strictness, the following additional compatibilities are taken into account:

  • for unsigned integer dtype, a signed integer-type value is compatible if value >= 0

  • for floating-point dtype, integer-type value are always considered compatible

  • for floating-point dtype, value of all floating-point- types are considered compatible, even if they have a lower precision (note the coercion test below, though)

Additionally, it is checked whether value is representable as the target data type. This is done by coercing value to dtype and then checking for equality (using np.isclose).

Parameters
  • value (Any) – The value to test.

  • raise_exc (bool, optional) – Whether to raise an exception or not.

Returns

Whether or not the given value is a valid parameter.

Return type

bool

Raises

ValidationError – If validation failed or is impossible (for instance due to ambiguous validity parameters). This error message contains further information on why validation failed.

classmethod from_shorthand(default: Any, *, mode: str, **kwargs)[source]#

Constructs a Parameter object from a given shorthand mode.

Parameters
  • default (Any) – the default value for the parameter

  • mode (str) – A valid shorthand mode, see SHORTHAND_MODES

  • **kwargs – any further arguments for Parameter ininitialization, see __init__().

Returns

a Parameter object

classmethod to_yaml(representer, node)[source]#

Represent this Parameter object as a YAML mapping.

Parameters
  • representer (ruamel.yaml.representer) – The representer module

  • node (Parameter) – The node, i.e. an instance of this class

Returns

a yaml mapping that is able to recreate this object

classmethod from_yaml(constructor, node)[source]#

The default constructor for Parameter objects, expecting a YAML node that is mapping-like.

utopya.parameter.extract_validation_objects(model_cfg: dict, *, model_name: str) Tuple[dict, dict][source]#

Extracts all Parameter objects from a model configuration (a nested dict), replacing them with their default values. Returns both the modified model configuration well as the Parameter objects (keyed by the key sequence necessary to reach them within the model configuration).

Parameters
  • model_cfg (dict) – the model configuration to inspect

  • model_name (str) – the name of the model

Returns

a tuple of (model config, parameters to validate).

The model config contains the passed config dict in which all Parameter class elements have been replaced by their default entries. The second entry is a dictionary consisting of the Parameter class objects (requiring validation) with keys being key sequences to those Parameter objects. Note that the key sequence is relative to the level above the model configuration, with model_name as a common entry for all returned values.

Return type

Tuple[dict, dict]

utopya.plotting module#

DEPRECATED module that provides backwards-compatibility for the old utopya module structure.

Deprecated since version 1.0.0: This module will be removed soon, please use utopya.eval instead.

utopya.project_registry module#

Implementation of the utopya project registry

class utopya.project_registry.ProjectPaths(*, base_dir: pydantic.types.DirectoryPath, project_info: Optional[pydantic.types.FilePath] = None, models_dir: Optional[pydantic.types.DirectoryPath] = None, py_tests_dir: Optional[pydantic.types.DirectoryPath] = None, py_plots_dir: Optional[pydantic.types.DirectoryPath] = None, mv_project_cfg: Optional[pydantic.types.FilePath] = None, project_base_plots: Optional[pydantic.types.FilePath] = None)[source]#

Bases: utopya._yaml_registry.entry.BaseSchema

Schema to use for a project’s paths field

base_dir: pydantic.types.DirectoryPath#
project_info: Optional[pydantic.types.FilePath]#
models_dir: Optional[pydantic.types.DirectoryPath]#
py_tests_dir: Optional[pydantic.types.DirectoryPath]#
py_plots_dir: Optional[pydantic.types.DirectoryPath]#
mv_project_cfg: Optional[pydantic.types.FilePath]#
project_base_plots: Optional[pydantic.types.FilePath]#
_abc_impl = <_abc_data object>#
class utopya.project_registry.ProjectMetadata(*, version: Optional[str] = None, long_name: Optional[str] = None, description: Optional[str] = None, long_description: Optional[str] = None, license: Optional[str] = None, authors: Optional[List[str]] = None, email: Optional[str] = None, website: Optional[str] = None, utopya_compatibility: Optional[str] = None, language: Optional[str] = None, requirements: Optional[List[str]] = None, misc: Optional[Dict[str, Any]] = None)[source]#

Bases: utopya._yaml_registry.entry.BaseSchema

Schema to use for a project’s metadata field

version: Optional[str]#
long_name: Optional[str]#
description: Optional[str]#
long_description: Optional[str]#
license: Optional[str]#
authors: Optional[List[str]]#
email: Optional[str]#
website: Optional[str]#
utopya_compatibility: Optional[str]#
language: Optional[str]#
requirements: Optional[List[str]]#
misc: Optional[Dict[str, Any]]#
_abc_impl = <_abc_data object>#
class utopya.project_registry.ProjectSettings(*, preload_project_py_plots: Optional[bool] = None, preload_framework_py_plots: Optional[bool] = None)[source]#

Bases: utopya._yaml_registry.entry.BaseSchema

Schema to use for a project’s settings field

preload_project_py_plots: Optional[bool]#

Whether to preload the project-level plot module (py_plots_dir) after initialization of the PlotManager. If not given, will load the module.

preload_framework_py_plots: Optional[bool]#

Whether to preload the framework-level plot module (py_plots_dir) after initialization of the PlotManager. If not given, will load the module.

_abc_impl = <_abc_data object>#
class utopya.project_registry.ProjectSchema(*, project_name: str, framework_name: Optional[str] = None, paths: utopya.project_registry.ProjectPaths, metadata: utopya.project_registry.ProjectMetadata, settings: utopya.project_registry.ProjectSettings = {}, run_cfg_format: str = 'yaml', cfg_set_abs_search_dirs: Optional[List[str]] = None, cfg_set_model_source_subdirs: Optional[List[str]] = None, custom_py_modules: Optional[Dict[str, pydantic.types.DirectoryPath]] = None, output_files: Optional[dict] = None, debug_level_updates: Optional[Dict[str, dict]] = None)[source]#

Bases: utopya._yaml_registry.entry.BaseSchema

The data model for a project registry entry

project_name: str#
framework_name: Optional[str]#
paths: utopya.project_registry.ProjectPaths#
metadata: utopya.project_registry.ProjectMetadata#
settings: utopya.project_registry.ProjectSettings#
run_cfg_format: str#
cfg_set_abs_search_dirs: Optional[List[str]]#
cfg_set_model_source_subdirs: Optional[List[str]]#
custom_py_modules: Optional[Dict[str, pydantic.types.DirectoryPath]]#
output_files: Optional[dict]#
debug_level_updates: Optional[Dict[str, dict]]#
_abc_impl = <_abc_data object>#
class utopya.project_registry.Project(name: str, *, registry: YAMLRegistry = None, **data)[source]#

Bases: utopya._yaml_registry.entry.RegistryEntry

A registry entry that describes a project

SCHEMA#

alias of utopya.project_registry.ProjectSchema

property framework_project: Optional[utopya.project_registry.Project]#

If a framework project is defined, retrieve it from the registry

get_git_info(*, include_patch_info: bool = False) dict[source]#

Returns information about the state of this project’s git repository using the python-git-info package.

If no git information is retrievable, e.g. because the project’s base_dir does not contain a git repository, will still return a dict but with have_git_info entry set to False.

Otherwise the git information will be in the latest_commit entry.

Parameters

include_patch_info (bool, optional) – If True, will attempt a subprocess call to git and store patch information alongside in the diff entry. In that case, the dirty entry will denote whether there were uncommitted changes.

Returns

A dict containing information about the associated git repo.

Return type

dict

class utopya.project_registry.ProjectRegistry(registry_dir: Optional[str] = None)[source]#

Bases: utopya._yaml_registry.registry.YAMLRegistry

The project registry

__init__(registry_dir: Optional[str] = None)[source]#

Initializes the project registry, loading available entries from the registry directory in the utopya config directory.

This also creates the projects directory, if not created yet.

Parameters

registry_dir (str, optional) – A custom projects

register(*, base_dir: str, info_file: Optional[str] = None, custom_project_name: Optional[str] = None, require_matching_names: Optional[bool] = None, exists_action: str = 'raise') dict[source]#

Register or update information of a project.

Parameters
  • base_dir (str) – Project base directory

  • info_file (str, optional) – Path to info file which contains further path information and metadata (may be relative to base directory). If not given, will use some defaults to search for it.

  • custom_project_name (str, optional) – Custom project name, overwrites the one given in the info file

  • require_matching_names (bool, optional) – If set, will require that the custom project name is equal to the one given in the project info file. This allows checking that the file content does not diverge from some outside state.

  • exists_action (str, optional) – Action to take upon existing project

Returns

Project information for the new or validated project

Return type

dict

utopya.project_registry.PROJECTS = <utopya.project_registry.ProjectRegistry object>#

The package-wide project registry

utopya.reporter module#

Implementation of the reporter framework which can be used to report on the progress or result of operations within utopya.

class utopya.reporter.ReportFormat(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]#

Bases: object

A report format aggregates callables for a single report parser and potentially multiple report writers. As a whole, it contains all arguments needed to generate a certain kind of report.

It is used in utopya.reporter.Reporter and derived classes, which are the classes that actually implement the parsers and writers.

__init__(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]#

Initializes a ReportFormat object, which gathers callables needed to create a report in a certain format.

Parameters
  • parser (Callable) – The parser method to use

  • writers (List[Callable]) – The writer method(s) to use

  • min_report_intv (float, optional) – The minimum report interval of reports in this format. Determines the time (in seconds) that needs to have passed before the next report will be emitted.

property min_report_intv: Optional[datetime.timedelta]#

Returns the minimum report interval, i.e. the time that needs to have passed between two reports.

property reporting_blocked: bool#

Determines whether this ReportFormat may be blocked from emission, e.g. because of the minimum report interval not having passed yet.

If no minimum report interval is given, will always return False. Otherwise checks if at least that interval has passed since the last report.

report(*, force: bool = False, parser_kwargs: Optional[dict] = None) bool[source]#

Parses and writes a report corresponding to the callables defined in this report format.

Parameters
  • force (bool, optional) – If True, will ignore the minimum report interval and always perform a report.

  • parser_kwargs (dict, optional) – Keyword arguments passed on to the parser

Returns

Whether a report was generated or not

Return type

bool

class utopya.reporter.Reporter(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]#

Bases: object

The Reporter class holds general reporting capabilities.

It needs to be subclassed in order to specialize its reporting functions.

__init__(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]#

Initialize the Reporter base class.

Parameters
  • report_formats (Union[List[str], Dict[str, dict]], optional) – The report formats to use with this reporter. If given as list of strings, the strings are the names of the report formats as well as those of the parsers; all other parameters are the defaults. If given as dict of dicts, the keys are the names of the formats and the inner dicts are the parameters to create report formats from.

  • default_format (str, optional) – The name of the default report format; if None is given, the .report method requires the name of a report format.

  • report_dir (str, optional) – if reporting to a file; this is the base directory that is reported to.

  • suppress_cr (bool, optional) – Whether to suppress carriage return characters in writers. This option is useful when the reporter is not the only class that writes to a stream.

property report_formats: dict#

Returns the dict of ReportFormat objects.

property default_format: Union[None, utopya.reporter.ReportFormat]#

Returns the default report format or None, if not set.

property suppress_cr: bool#

Whether to suppress a carriage return. Objects using the reporter can set this property to communicate that they will be putting content into the stdout stream as well. The writers can check this property and adjust their behaviour accordingly.

add_report_format(name: str, *, parser: Optional[str] = None, write_to: Union[str, Dict[str, dict]] = 'stdout', min_report_intv: Optional[float] = None, rf_kwargs: Optional[dict] = None, **parser_kwargs)[source]#

Add a report format to this reporter.

Parameters
  • name (str) – The name of this format

  • parser (str, optional) – The name of the parser; if not given, the name of the report format is assumed

  • write_to (Union[str, Dict[str, dict]], optional) – The name of the writer. If this is a dict of dict, the keys will be interpreted as the names of the writers and the nested dict as the **kwargs to the writer function.

  • min_report_intv (float, optional) – The minimum report interval (in seconds) for this report format

  • rf_kwargs (dict, optional) – Further kwargs to ReportFormat.__init__

  • **parser_kwargs – The kwargs to the parser function

Raises

ValueError – A report format with this name already exists

report(report_format: Optional[str] = None, **kwargs) bool[source]#

Create a report with the given format; if none is given, the default format is used.

Parameters
  • report_format (str, optional) – The report format to use

  • **kwargs – Passed on to the ReportFormat.report() call

Returns

Whether there was a report

Return type

bool

Raises

ValueError – If no default format was set and no report format name was given

parse_and_write(*, parser: Union[str, Callable], write_to: Union[str, Callable], **parser_kwargs)[source]#

This function allows to select a parser and writer explicitly.

Parameters
  • parser (Union[str, Callable]) – The parser method to use.

  • write_to (Union[str, Callable]) – The write method to use. Can also be a sequence of names and/or callables or a Dict. For allowed specification formats, see the ._resolve_writers method.

  • **parser_kwargs – Passed to the parser, if given

_resolve_parser(parser: Union[str, Callable], **parser_kwargs) Callable[source]#

Given a string or a callable, returns the corresponding callable.

Parameters
  • parser (Union[str, Callable]) – If a callable is already given, returns that; otherwise looks for a parser method with the given name in the attributes of this class.

  • **parser_kwargs – Arguments that should be passed to the parser. If given, a new function is created where these arguments are already included.

Returns

The desired parser function

Return type

Callable

Raises

ValueError – If no parser with the given name is available

_resolve_writers(write_to) Dict[str, Callable][source]#

Resolves the given argument to a list of callable writer functions.

Parameters

write_to

a specification of the writers to use. Allows many different ways of specifying the writer functions, depending on the type of the argument:

  • str: the name of the writer method of this reporter

  • Callable: the writer function to use

  • sequence of str and/or Callable: the names and/or functions to use

  • Dict[str, dict]: the names of the writer functions and additional keyword arguments.

If the type is wrong, will raise.

Returns

the writers (key: name, value: writer method)

Return type

Dict[str, Callable]

Raises
  • TypeError – Invalid write_to argument

  • ValueError – A writer with that name was already added or a writer with the given name is not available.

_write_to_stdout(s: str, *, flush: bool = True, **print_kws)[source]#

Writes the given string to stdout using the print function.

Parameters
  • s (str) – The string to write

  • flush (bool, optional) – Whether to flush directly; default: True

  • **print_kws – Other print function keyword arguments

_write_to_stdout_noreturn(s: str, *, prepend='  ')[source]#

Writes to stdout without ending the line. Always flushes.

Parameters
  • s (str) – The string to write

  • prepend (str, optional) – Is prepended to the string; useful because the cursor might block this point of the terminal

  • report_no (int, optional) – accepted from ReportFormat call

_write_to_log(s: str, *, lvl: int = 10, skip_if_empty: bool = False)[source]#

Writes the given string via the logging module.

Parameters
  • s (str) – The string to log

  • lvl (int, optional) – The level at which to log at; default is 10, corresponding to the DEBUG level

  • skip_if_empty (bool, optional) – Whether to skip writing if s is empty.

_write_to_file(s: str, *, path: str = '_report.txt', mode: str = 'w', skip_if_empty: bool = False)[source]#

Writes the given string to a file

Parameters
  • s (str) – The string to write

  • path (str, optional) – The path to write it to; will be assumed relative to the report_dir attribute; if that is not given, path needs to be absolute. By default, assumes that there is a report_dir given.

  • mode (str, optional) – Writing mode of that file

  • skip_if_empty (bool, optional) – Whether to skip writing if s is empty.

Raises

ValueError – If report_dir was not set and path is relative.

class utopya.reporter.WorkerManagerReporter(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#

Bases: utopya.reporter.Reporter

This class specializes the base Reporter to report on the WorkerManager state and its progress.

TTY_MARGIN = 4#

Margin to use when writing to terminal

PROGRESS_BAR_SYMBOLS = {'active': '░', 'active_progress': '▒', 'finished': '▓', 'space': ' '}#

Symbols to use in progress bar parser

__init__(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#

Initialize the specialized reporter for the WorkerManager.

It is aware of the WorkerManager and may additionally have acces to the Multiverse it is embedded in, which provides additional information to report parsers.

Parameters
  • wm (utopya.workermanager.WorkerManager) – The associated WorkerManager instance

  • mv (utopya.multiverse.Multiverse, optional) – The Multiverse this reporter is used in. If this is provided, it can be used in report parsers, e.g. to provide additional information on simulations.

  • **reporter_kwargs – Passed on to parent method

property wm: utopya.workermanager.WorkerManager#

Returns the associated WorkerManager

property task_counters: collections.OrderedDict#

Returns a dict of task counters containing the following entries:

  • total: total number of registered WorkerManager tasks

  • active: number of currently active tasks

  • finished: number of finished tasks, including tasks that were stopped via a stop condition

  • stopped: number of tasks for which stop conditions were fulfilled, see Stop Conditions

property wm_progress: float#

The WorkerManager’s progress, between 0 and 1.

property wm_active_tasks_progress: float#

The active tasks’ progress. If there are no active tasks in the worker manager, returns 0.

property wm_elapsed: Optional[datetime.timedelta]#

Seconds elapsed since start of working or None if not yet started

property wm_times: dict#

Return the characteristics of WorkerManager times. Calls get_progress_info() without any additional arguments.

register_task(task: utopya.task.WorkerTask)[source]#

Given the task object, extracts and stores some information like its run time or its exit code. Exit codes are aggregated over multiple registrations.

This can be used as a callback function from a WorkerTask object.

Parameters

task (utopya.task.WorkerTask) – The WorkerTask to extract information from.

calc_runtime_statistics(min_num: int = 10) Optional[collections.OrderedDict][source]#

Calculates the current runtime statistics.

Parameters

min_num (int, optional) – Minimum number of runtimes that need to be registered for these statistics to actually be computed. If below this number, will return None.

Returns

The runtime statistics or None, if there

were too few entries.

Return type

Union[OrderedDict, None]

get_progress_info(**eta_options) Dict[str, float][source]#

Compiles a dict containing progress information for the current work session.

Parameters

**eta_options – Passed on to method calculating est_left, _compute_est_left().

Returns

Progress information. Guaranteed to contain the

keys start, now, elapsed, est_left, est_end, and end.

Return type

Dict[str, float]

_compute_est_left(*, progress: float, elapsed: datetime.timedelta, mode: str = 'from_start', progress_buffer_size: int = 60) datetime.timedelta[source]#

Computes the estimated time left until the end of the work session (ETA) using the current progress value and the elapsed time. Depending on mode, additional information may be included in the calculation.

Parameters
  • progress (float) – The current progress value, in (0, 1]

  • elapsed (datetime.timedelta) – The elapsed time since start

  • mode (str, optional) –

    By which mode to calculate the ETA. Available modes are:

    • from_start, where ETA is computed from the start of

      work session.

    • from_buffer, where ETA is computed from a more

      recent point during the work session. This uses a buffer to keep track of recent progress and computes the ETA against the oldest record (controlled by argument progress_buffer_size), giving more accurate estimates for long-running work sessions.

  • progress_buffer_size (int, optional) – The size of the ring buffer used in from_buffer mode.

Returns

Estimate for how much time is left until the

end of the work session.

Return type

datetime.timedelta

_parse_task_counters(*, report_no: Optional[int] = None) str[source]#

Return a string that shows the task counters of the WorkerManager

Parameters

report_no (int, optional) – A counter variable passed by the ReportFormat call, indicating how often this parser was called so far.

Returns

A str representation of the task counters of the WorkerManager

Return type

str

_parse_progress(*, report_no: Optional[int] = None) str[source]#

Returns a progress string

Parameters

report_no (int, optional) – A counter variable passed by the ReportFormat call, indicating how often this parser was called so far.

Returns

A simple progress indicator

Return type

str

_parse_progress_bar(*, num_cols: Union[str, int] = 'fixed', fstr: str = '  ╠{ticks[0]:}{ticks[1]:}{ticks[2]:}{ticks[3]:}╣ {info:}{times:}', info_fstr: str = '{total_progress:>5.1f}% ', show_times: bool = False, times_fstr: str = '| {elapsed:} elapsed | ~{est_left:} left ', times_fstr_final: str = '| finished in {elapsed:} ', times_kwargs: dict = {}, report_no: Optional[int] = None) str[source]#

Returns a progress bar.

It shows the amount of finished tasks, active tasks, and a percentage.

Parameters
  • num_cols (Union[str, int], optional) – The number of columns available for creating the progress bar. Can also be a string adaptive to poll terminal size upon each call, or fixed to use the number of columns determined at import time.

  • fstr (str, optional) – The format string for the final output. Should contain the ticks 4-tuple, which makes up the progress bar, and can optionally contain the``info`` and times segments, formatted using the respective format string arguments.

  • info_fstr (str, optional) –

    The format string for the info section of the final output. Available keys:

    • total_progress

    • active_progress

    • cnt, the task counters dictionary, see:

      task_counters()

  • show_times (bool, optional) – Whether to show a short version of the results of the times parser

  • times_fstr (str, optional) – Format string for times information

  • times_fstr_final (str, optional) – Format string for times information once the work session has ended

  • times_kwargs (dict, optional) – Passed on to times parser. Only used if show_times is set.

  • report_no (int, optional) – A counter variable passed by the ReportFormat call, indicating how often this parser was called so far.

Returns

The one-line progress bar

Return type

str

_parse_times(*, fstr: str = 'Elapsed:  {elapsed:<8s}  |  Est. left:  {est_left:<8s}  |  Est. end:  {est_end:<10s}', timefstr_short: str = '%H:%M:%S', timefstr_full: str = '%d.%m., %H:%M:%S', use_relative: bool = True, times: Optional[dict] = None, report_no: Optional[int] = None, **progress_info_kwargs) str[source]#

Parses the WorkerManager’s time information, including estimated time left or others.

Parameters
  • fstr (str, optional) – The main format string; gets as keys the results of the WorkerManager time information. Available keys: elapsed, est_left, est_end, start, now, end.

  • timefstr_short (str, optional) – A time format string for absolute dates; short version.

  • timefstr_full (str, optional) – A time format string for absolute dates; long (ideally: full) version.

  • use_relative (bool, optional) – Whether for a date difference of 1 to use relative dates, e.g. Today, 13:37.

  • times (dict, optional) – A dict of times to use; this is mainly for testing purposes!

  • report_no (int, optional) – The report number passed by ReportFormat

  • **progress_info_kwargs – Passed on to method calculating progress get_progress_info()

Returns

A string representation of the time information

Return type

str

_parse_runtime_stats(*, fstr: str = '  {k:<13s} {v:}', join_char='\n', ms_precision: int = 1, report_no: Optional[int] = None) str[source]#

Parses the runtime statistics dict into a multiline string

Parameters
  • fstr (str, optional) – The format string to use. Gets passed the keys k and v where k is the name of the entry and v its value. Note that v is a non-numeric value.

  • join_char (str, optional) – The join character / string to join the elements together.

  • ms_precision (int, optional) – Number of digits to represent the milliseconds part of the runtimes.

  • report_no (int, optional) – A counter variable passed by the ReportFormat call, indicating how often this parser was called so far.

Returns

The multi-line runtime statistics

Return type

str

_parse_report(*, fstr: str = '  {k:<{w:}s}  {v:}', min_num: int = 4, report_no: Optional[int] = None, show_individual_runtimes: bool = True, task_label_singular: str = 'task', task_label_plural: str = 'tasks') str[source]#

Parses a report for all tasks that were being worked on into a multiline string. The headings can be adjusted by keyword arguments.

Parameters
  • fstr (str, optional) – The format string to use. Gets passed the keys k and v where k is the name of the entry and v its value. Note that this format string is also used with v being a non-numeric value. Also, w can be used to have a key column of constant width.

  • min_num (int, optional) – The minimum number of universes needed to calculate runtime statistics.

  • report_no (int, optional) – A counter variable passed by the ReportFormat call, indicating how often this parser was called so far.

  • show_individual_runtimes (bool, optional) – Whether to report individual universe runtimes; default: True. This should be disabled if there are a huge number of universes.

  • task_label_singular (str, optional) – The label to use in the report when referring to a single task.

  • task_label_plural (str, optional) – The label to use in the report when referring to multiple tasks.

Returns

The multi-line simulation report string

Return type

str

_parse_pspace_info(*, fstr: str, report_no: Optional[int] = None) str[source]#

Provides information about the parameter space.

Extracts the parameter_space from the associated Multiverse’s meta configuration and provides information on that.

If there are multiple tasks specified, it is assumed that a sweep is or was being carried out and an information string is retrieved from the paramspace.paramspace.ParamSpace object, which is then returned. If only a single task was defined, returns an empty string.

Parameters

report_no (int, optional) – A counter variable passed by the ReportFormat call, indicating how often this parser was called so far.

Returns

If there is more than one task, returns the result of

paramspace.paramspace.ParamSpace.get_info_str(). If not, returns a string denoting that there was only one task.

Return type

str

_write_to_file(*args, path: str = '_report.txt', cluster_mode_path: str = '{0:}_{node_name:}{ext:}', **kwargs)[source]#

Overloads the parent method with capabilities needed in cluster mode

All args and kwargs are passed through. If in cluster mode, the path is changed such that it includes the name of the node.

Parameters
  • *args – Passed on to parent method

  • path (str, optional) – The path to save to

  • cluster_mode_path (str, optional) – The format string to use for the path in cluster mode. _Requires_ to contain the format key {0:} which retains the given path, extension split off. Extension can be used via ext (already includes the dot). Additional format keys: node_name, job_id.

  • **kwargs – Passed on to parent method

utopya.stop_conditions module#

This module implements the StopCondition class, which is used by the WorkerManager to stop a worker process in certain situations.

In addition, it implements a set of basic stop condition functions and provides the stop_condition_function() decorator which is required to make them accessible by name.

utopya.stop_conditions.SIG_STOPCOND = 'SIGUSR1'#

Signal to use for stopping workers with fulfilled stop conditions

utopya.stop_conditions.STOP_CONDITION_FUNCS: Dict[str, Callable] = {'check_monitor_entry': <function check_monitor_entry>, 'timeout_wall': <function timeout_wall>}#

Registered stop condition functions are stored in this dictionary. These functions evaluate whether a certain stop condition is actually fulfilled.

To that end, a WorkerTask object is passed to these functions, the information in which can be used to determine whether the condition is fulfilled. The signature of these functions is: (task: WorkerTask, **kws) -> bool

utopya.stop_conditions._FAILED_MONITOR_ENTRY_CHECKS = []#

Keeps track of failed monitor entry checks in the check_monitor_entry() stop condition function in order to avoid repetitive warnings.

class utopya.stop_conditions.StopCondition(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]#

Bases: object

A StopCondition object holds information on the conditions in which a worker process should be stopped.

__init__(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]#

Create a new stop condition object.

Parameters
  • to_check (List[dict], optional) – A list of dicts, that holds the functions to call and the arguments to call them with. The only requirement for the dict is that the func key is available. All other keys are unpacked and passed as kwargs to the given function. The func key can be either a callable or a string corresponding to a name in the utopya.stopcond_funcs module.

  • name (str, optional) – The name of this stop condition

  • description (str, optional) – A short description of this stop condition

  • enabled (bool, optional) – Whether this stop condition should be checked; if False, it will be created but will always be un- fulfilled when checked.

  • func (Union[Callable, str], optional) – (For the short syntax only!) If no to_check argument is given, a function can be given here that will be the only one that is checked. If this argument is a string, it is also resolved from the utopya stopcond_funcs module.

  • **func_kwargs – (For the short syntax) The kwargs that are passed to the single stop condition function

property fulfilled_for: Set[utopya.task.Task]#

The set of tasks this stop condition was fulfilled for

static _resolve_sc_funcs(to_check: List[dict], func: Union[str, Callable], func_kwargs: dict) List[tuple][source]#

Resolves the functions and kwargs that are to be checked.

The callable is either retrieved from the module-level stop condition functions registry or, if the given func is already a callable, that one will be used.

__str__() str[source]#

A string representation for this StopCondition, including the name and, if given, the description.

fulfilled(task: utopya.task.Task) bool[source]#

Checks if the stop condition is fulfilled for the given worker, using the information from the dict.

All given stop condition functions are evaluated; if all of them return True, this method will also return True.

Furthermore, if the stop condition is fulfilled, the task’s set of fulfilled stop conditions will

Parameters

task (utopya.task.Task) – Task object that is to be checked

Returns

If all stop condition functions returned true for the given

worker and its current information

Return type

bool

yaml_tag = '!stop-condition'#
classmethod to_yaml(representer, node)[source]#

Creates a yaml representation of the StopCondition object by storing the initialization kwargs as a yaml mapping.

Parameters
  • representer (ruamel.yaml.representer) – The representer module

  • node (StopCondition) – The node, i.e. an instance of this class

Returns

a yaml mapping that is able to recreate this object

classmethod from_yaml(constructor, node)[source]#

Creates a StopCondition object by unpacking the given mapping such that all stored arguments are available to __init__.

utopya.stop_conditions.stop_condition_function(f: Callable)[source]#

A decorator that registers the decorated callable in the module-level stop condition function registry. The callable’s __name__ attribute will be used as the key.

Parameters

f (Callable) – A callable that is to be added to the function registry.

Raises

AttributeError – If the name already exists in the registry

utopya.stop_conditions.timeout_wall(task: utopya.task.WorkerTask, *, seconds: float) bool[source]#

Checks the wall timeout of the given worker

Parameters
  • task (utopya.task.WorkerTask) – The WorkerTask object to check

  • seconds (float) – After how many seconds to trigger the wall timeout

Returns

Whether the timeout is fulfilled

Return type

bool

utopya.stop_conditions.check_monitor_entry(task: utopya.task.WorkerTask, *, entry_name: str, operator: str, value: float) bool[source]#

Checks if a monitor entry compares in a certain way to a given value

Parameters
  • task (utopya.task.WorkerTask) – The WorkerTask object to check

  • entry_name (str) – The name of the monitor entry, leading to the value to the left-hand side of the operator

  • operator (str) – The binary operator to use

  • value (float) – The right-hand side value to compare to

Returns

Result of op(entry, value)

Return type

bool

utopya.task module#

The Task class supplies a container for all information needed for a task.

The WorkerTask and ProcessTask classes specialize on tasks for the WorkerManager that work on subprocesses or multiprocessing processes.

utopya.task._ANSI_ESCAPE = re.compile('\\x1B(?:[@-Z\\\\-_]|\\[[0-?]*[ -/]*[@-~])')#

A regex pattern to remove ANSI escape characters, needed for stream saving

From: https://stackoverflow.com/a/14693789/1827608

utopya.task._follow(f: _io.TextIOWrapper, delay: float = 0.05, should_stop: typing.Callable = <function <lambda>>) Generator[str, None, None][source]#

Generator that follows the output written to the given stream object and yields each new line written to it. If no output is retrieved, there will be a delay to reduce processor load.

The should_stop argument may be a callable that will lead to breaking out of the waiting loop. If it is not given, the loop will only break if reading from the stream f is no longer possible, e.g. because it was closed.

utopya.task.enqueue_lines(*, queue: queue.Queue, stream: TextIO, follow: bool = False, parse_func: Optional[Callable] = None) None[source]#

From the given text stream, read line-buffered lines and add them to the provided queue as 2-tuples, (line, parsed object).

This function is meant to be passed to an individual thread in which it can read individual lines separately from the main thread. Before exiting this function, the stream is closed.

Parameters
  • queue (queue.Queue) – The queue object to put the read line and parsed objects into.

  • stream (TextIO) – The stream identifier. If this is not a text stream, be aware that the elements added to the queue might need decoding.

  • follow (bool, optional) – If instead of iter(stream.readline), the _follow() function should be used instead. This should be selected if the stream is file-like instead of sys.stdout-like.

  • parse_func (Callable, optional) – A parse function that the read line is passed through. This should be a unary function that either returns a successfully parsed line or None.

utopya.task.parse_yaml_dict(line: str, *, start_str: str = '!!map') Union[None, dict][source]#

A yaml parse function that can be passed to enqueue_lines. It only tries parsing the line if it starts with the provided start string.

It tries to decode the line, and parse it as a yaml. If that fails, it will still try to decode the string. If that fails yet again, the unchanged line will be returned.

Parameters
  • line (str) – The line to decode, assumed byte-string, utf8-encoded

  • start_str (str, optional) – Description

Returns

either the decoded dict, or, if that failed:

Return type

Union[None, dict]

class utopya.task.Task(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]#

Bases: object

The Task is a container for a task handled by the WorkerManager.

It aims to provide the necessary interfaces for the WorkerManager to easily associate tasks with the corresponding workers and vice versa.

__init__(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]#

Initialize a Task object.

Parameters
  • name (str, optional) – The task’s name. If none is given, the generated uuid will be used.

  • priority (float, optional) – The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority.

  • callbacks (Dict[str, Callable], optional) – A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object.

  • progress_func (Callable, optional) – Invoked by the progress property and used to calculate the progress given the current task object as argument

_name#
_priority#
_uid#
callbacks#
_progress_func#
_stop_conditions#
property name: str#

The task’s name, if given; else the uid.

property uid: int#

The task’s unique ID

property priority: float#

The task’s priority. Default is +inf, which is the lowest priority

property order_tuple: tuple#

Returns the ordering tuple (priority, uid.time)

property progress: float#

If a progress function is given, invokes it; otherwise returns 0

This also performs checks that the progress is in [0, 1]

property fulfilled_stop_conditions: Set[StopCondition]#

The set of fulfilled stop conditions for this task. Typically, this is set by the StopCondition itself as part of its evaluation in its fulfilled() method.

__eq__(other) bool[source]#

Evaluates equality of two tasks: returns true only if identical.

Note

We trust that the unique ID of each task (generated with uuid) is really unique, therefore different tasks can never be fully equivalent.

_invoke_callback(name: str)[source]#

If given, invokes the callback function with the name name.

Note

In order to have higher flexibility, this will not raise errors or warnings if there was no callback function specified with the given name.

class utopya.task.WorkerTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#

Bases: utopya.task.Task

A specialisation of Task for use in the WorkerManager.

It is able to spawn a worker process using subprocess.Popen, executing the task in a non-blocking manner. At the same time, the worker’s stream can be read in via another non-blocking thread and stream information can be parsed. Furthermore, this class provides most of the interface for signalling the spawned process.

For an equivalent class that uses multiprocessing instead of subprocess, see the derived MPProcessTask.

STREAM_PARSE_FUNCS = {'default': None, 'yaml_dict': <function parse_yaml_dict>}#
__init__(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#

Initialize a WorkerTask.

This is a specialization of Task for use in the WorkerManager.

Parameters
  • setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the worker_kwargs keyword argument, containing the dict passed here. Additionally, setup_kwargs are unpacked into the funtion call. The function should return a dict that is then used as worker_kwargs for the individual task.

  • setup_kwargs (dict, optional) – The keyword arguments unpacked into the setup_func call.

  • worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to setup_func and, if a setup_func is given, the return value of that function will be used for the worker_kwargs.

  • **task_kwargs – Arguments to be passed to __init__(), including the callbacks dictionary among other things.

Raises

ValueError – If neither setup_func nor worker_kwargs were given, thus lacking information on how to spawn the worker.

setup_func#
setup_kwargs#
worker_kwargs#
_worker#
_worker_pid#
_worker_status#
streams#
profiling#
property worker: subprocess.Popen#

The associated worker process object or None, if not yet created.

property worker_pid: int#

The process ID of the associated worker process

property worker_status: Optional[int]#

The worker processe’s current status or False, if there is no worker spawned yet.

Note that this invokes a poll to the worker process if one was spawned.

Returns

Current worker status. False, if there was no

worker associated yet.

Return type

Union[int, None]

property outstream_objs: list#

Returns the list of objects parsed from the ‘out’ stream

__str__() str[source]#

Return basic WorkerTask information.

spawn_worker() subprocess.Popen[source]#

Spawn a worker process using subprocess.Popen and manage the corresponding queue and thread for reading the stdout stream.

If there is a setup_func, this function will be called first.

Afterwards, from the worker_kwargs returned by that function or from the ones given during initialisation (if no setup_func was given), the worker process is spawned and associated with this task.

Returns

The created process object

Return type

subprocess.Popen

Raises
  • RuntimeError – If a worker was already spawned for this task.

  • TypeError – For invalid args argument

read_streams(stream_names: list = 'all', *, max_num_reads: int = 10, forward_directly: bool = False) None[source]#

Read the streams associated with this task’s worker.

Parameters
  • stream_names (list, optional) – The list of stream names to read. If all (default), will read all streams.

  • max_num_reads (int, optional) –

    How many lines should be read from the buffer. For -1, reads the whole buffer.

    Warning

    Do not make this value too large as it could block the whole reader thread of this worker.

  • forward_directly (bool, optional) – Whether to call the forward_streams() method; this is done before the callback and can be useful if the callback should not happen before the streams are forwarded.

Returns

None

save_streams(stream_names: list = 'all', *, final: bool = False)[source]#

For each stream, checks if it is to be saved, and if yes: saves it.

The saving location is stored in the streams dict. The relevant keys are the save flag and the save_path string.

Note that this function does not save the whole stream log, but only those part of the stream log that have not already been saved. The position up to which the stream was saved is stored under the lines_saved key in the stream dict.

Parameters
  • stream_names (list, optional) – The list of stream names to _check_. If ‘all’ (default), will check all streams whether the save flag is set.

  • save_raw (bool, optional) – If True, stores the raw log; otherwise stores the regular log, i.e. the lines that were parseable not included.

  • final (bool, optional) – If True, this is regarded as the final save operation for the stream, which will lead to additional information being saved to the end of the log.

  • remove_ansi (bool, optional) – If True, will remove ANSI escape characters (e.g. from colored logging) from the log before saving to file.

Returns

None

forward_streams(stream_names: list = 'all', forward_raw: bool = False) bool[source]#

Forwards the streams to stdout, either via logging module or print

This function can be periodically called to forward the part of the stream logs that was not already forwarded to stdout.

The information for that is stored in the stream dict. The log_level entry is used to determine whether the logging module should be used or (in case of None) the print method.

Parameters

stream_names (list, optional) – The list of streams to print

Returns

whether there was any output

Return type

bool

signal_worker(signal: str) tuple[source]#

Sends a signal to this WorkerTask’s worker.

Parameters

signal (str) – The signal to send. Needs to be a valid signal name, i.e.: available in python signal module.

Raises

ValueError – When an invalid signal argument was given

Returns

(signal: str, signum: int) sent to the worker

Return type

tuple

_prepare_process_args(*, args: tuple, read_stdout: bool, **kwargs) Tuple[tuple, dict][source]#

Prepares the arguments that will be passed to subprocess.Popen

_spawn_process(args, **popen_kwargs)[source]#

This helper takes care only of spawning the actual process and potential error handling.

It can be subclassed to spawn a different kind of process

_spawn_worker(*, args: tuple, popen_kwargs: Optional[dict] = None, read_stdout: bool = True, **_) subprocess.Popen[source]#

Helper function to spawn the worker subprocess

_setup_stream_reader(stream_name: str, *, stream, parser: str = 'default', follow: bool = False, save_streams: bool = False, save_streams_to: Optional[str] = None, save_raw: bool = True, remove_ansi: bool = False, forward_streams: bool = False, forward_raw: bool = True, streams_log_lvl: Optional[int] = None, **_)[source]#

Sets up the stream reader thread

_stop_stream_reader(name: str)[source]#

Stops the stream reader with the given name by closing the associated stream’s file handle.

_finished() None[source]#

Is called once the worker has finished working on this task.

It takes care that a profiling time is saved and that the remaining stream information is logged.

utopya.task._target_wrapper(target, streams: dict, *args, **kwargs)[source]#

A wrapper around the multiprocessing.Process target function which takes care of stream handling.

class utopya.task.PopenMPProcess(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]#

Bases: object

A wrapper around multiprocessing.Process that replicates (wide parts of) the interface of subprocess.Popen.

__init__(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]#

Creates a multiprocessing.Process and starts it.

The interface here is a subset of subprocess.Popen that makes those features available that make sense for a multiprocessing.Process, mainly: stream reading.

Subsequently, the interface is quite a bit different to that of the multiprocessing.Process. The most important arguments of that interface are target, args, and kwargs, which can be set as follows:

  • target will be args[0]

  • args will be args[1:]

  • kwargs is an additional keyword argument that is not part of the subprocess.Popen interface typically.

Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a subprocess.PIPE or another stream specifier that is not subprocess.DEVNULL, a new multiprocessing.Pipe and a reader thread will be established.

Warning

This will always use spawn as a start method for the process!

Parameters
  • args (tuple) – The target callable (args[0]) and subsequent positional arguments.

  • kwargs (dict, optional) – Keyword arguments for the target.

  • stdin (None, optional) – The stdin stream

  • stdout (None, optional) – The stdout stream

  • stderr (None, optional) – The stderr stream

  • bufsize (int, optional) – The buffersize to use.

  • encoding (str, optional) – The encoding to use for the streams; should typically remain utf8, using other values is not encouraged!

_prepare_target_args(args: tuple, *, stdin, stdout, stderr) Tuple[Callable, tuple][source]#

Prepares the target callable and stream objects

__del__()[source]#

Custom destructor that closes the process and file descriptors

poll() Optional[int][source]#

Check if child process has terminated. Set and return returncode attribute. Otherwise, returns None.

With the underlying process being a multiprocessing.Process, this method is equivalent to the returncode property.

wait(timeout=None)[source]#

Wait for the process to finish; blocking call.

This method is not yet implemented, but will be!

communicate(input=None, timeout=None)[source]#

Communicate with the process.

This method is not yet implemented! Not sure if it will be …

send_signal(signal: int)[source]#

Send a signal to the process. Only works for SIGKILL and SIGTERM.

terminate()[source]#

Sends SIGTERM to the process

kill()[source]#

Sends SIGKILL to the process

property args: tuple#

The args argument to this process. Note that the returned tuple includes the target callable as its first entry.

Note that these have already been passed to the process; changing them has no effect.

property kwargs#

Keyword arguments passed to the target callable.

Note that these have already been passed to the process; changing them has no effect.

property stdin#

The attached stdin stream

property stdout#

The attached stdout stream

property stderr#

The attached stderr stream

property pid#

Process ID of the child process

property returncode: Optional[int]#

The child return code, set by poll() and wait() (and indirectly by communicate()). A None value indicates that the process hasn’t terminated yet.

A negative value -N indicates that the child was terminated by signal N (POSIX only).

class utopya.task.MPProcessTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#

Bases: utopya.task.WorkerTask

A WorkerTask specialization that uses multiprocessing.Process instead of subprocess.Popen.

It is mostly equivalent to WorkerTask but adjusts the private methods that take care of spawning the actual process and setting up the stream readers, such that the particularities of the PopenMPProcess wrapper are accounted for.

_spawn_process(args, **popen_kwargs) utopya.task.PopenMPProcess[source]#

This helper takes care only of spawning the actual process and potential error handling. It returns an PopenMPProcess instance, which has the same interface as subprocess.Popen.

_setup_stream_reader(*args, **kwargs)[source]#

Sets up the stream reader with follow=True, such that the file- like streams that PopenMPProcess uses can be read properly.

_stop_stream_reader(name: str)[source]#

Stops the stream reader thread with the given name by telling its follow function to stop, thus ending iteration.

setup_func#
setup_kwargs#
worker_kwargs#
_worker#
_worker_pid#
_worker_status#
streams#
profiling#
class utopya.task.TaskList[source]#

Bases: object

The TaskList stores Task objects in it, ensuring that none is in there twice and allows to lock it to prevent adding new tasks.

__init__()[source]#

Initialize an empty TaskList.

__len__() int[source]#

The length of the TaskList.

__contains__(val: utopya.task.Task) bool[source]#

Checks if the given object is contained in this TaskList.

__getitem__(idx: int) utopya.task.Task[source]#

Returns the item at the given index in the TaskList.

__iter__()[source]#

Iterate over the TaskList

__eq__(other) bool[source]#

Tests for equality of the task list by forwarding to _l attribute

lock()[source]#

If called, the TaskList becomes locked and allows no further calls to the append method.

append(val: utopya.task.Task)[source]#

Append a Task object to this TaskList

Parameters

val (Task) – The task to add

Raises
__add__(tasks: Sequence[utopya.task.Task])[source]#

Appends all the tasks in the given iterable to the task list

utopya.testtools module#

Tools that help testing models.

This mainly supplies the ModelTest class, which is a specialization of the Model for usage in tests.

class utopya.testtools.ModelTest(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]#

Bases: utopya.model.Model

A class to use for testing Utopia models.

It attaches to a certain model and makes it easy to load config files with which test should be carried out.

__init__(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]#

Initialize the ModelTest class for the given model name.

This is basically like the base class __init__ just that it sets the default value of use_tmpdir to True and renames TODO

Parameters
  • model_name (str) – Name of the model to test

  • test_file (str, optional) – The file this ModelTest is used in. If given, will look for config files relative to the folder this file is located in.

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.

Raises

ValueError – If the directory extracted from test_file is invalid

utopya.tools module#

Implements generally useful functions, partly by importing from dantro.tools

utopya.tools.load_selected_keys(src: dict, *, add_to: dict, keys: Sequence[Tuple[str, type, bool]], err_msg_prefix: Optional[str] = None, prohibit_unexpected: bool = True) None[source]#

Loads (only) selected keys from dict src into dict add_to.

Parameters
  • src (dict) – The dict to load values from

  • add_to (dict) – The dict to load values into

  • keys (Sequence[Tuple[str, type, bool]]) – Which keys to load, given as sequence of (key, allowed types, [required=False]) tuples.

  • err_msg_prefix (str) – A description string, used in error message

  • prohibit_unexpected (bool, optional) – Whether to raise on keys that were unexpected, i.e. not given in keys argument.

Raises
utopya.tools.add_item(value, *, add_to: dict, key_path: Sequence[str], value_func: Optional[Callable] = None, is_valid: Optional[Callable] = None, ErrorMsg: Optional[Callable] = None) None[source]#

Adds the given value to the add_to dict, traversing the given key path. This operation happens in-place.

Parameters
  • value – The value of what is to be stored. If this is a callable, the result of the call is stored.

  • add_to (dict) – The dict to add the entry to

  • key_path (Sequence[str]) – The path at which to add it

  • value_func (Callable, optional) – If given, calls it with value as argument and uses the return value to add to the dict

  • is_valid (Callable, optional) – Used to determine whether value is valid or not; should take single positional argument, return bool

  • ErrorMsg (Callable, optional) – A raisable object that prints an error message; gets passed value as positional argument.

Raises

Exception – type depends on specified ErrorMsg callable

utopya.tools.pprint(obj: Any, **kwargs)[source]#

Prints a “pretty” string representation of the given object.

Parameters
  • obj (Any) – The object to print

  • **kwargs – Passed to print

utopya.tools.pformat(obj: Any) str[source]#

Creates a “pretty” string representation of the given object.

This is achieved by creating a yaml representation.

Todo

Improve parsing of leaf-level mappings

utopya.tools.ensure_not_None(d: typing.Optional[typing.Any], fallback: typing.Union[type, typing.Callable] = <class 'dict'>) Any[source]#

Returns d if it is not None, otherwise creates a new object by calling fallback without any arguments.

utopya.tools.ensure_dict(d: Optional[dict]) dict[source]#
utopya.tools.parse_si_multiplier(s: str) int[source]#

Parses a string like 1.23M or -2.34 k into an integer.

If it is a string, parses the SI multiplier and returns the appropriate integer for use as number of simulation steps. Supported multipliers are k, M, G and T. These need to be used as the suffix of the string.

Note

This is only intended to be used with integer values and does not support float values like 1u.

The used regex can be found here.

Parameters

s (str) – A string representing an integer number, potentially including a supported SI multiplier as suffix.

Returns

The parsed number of steps as integer. If the value has decimal

places, integer rounding is applied.

Return type

int

Raises

ValueError – Upon string that does not match the expected pattern

utopya.tools.parse_num_steps(N: Union[str, int], *, raise_if_negative: bool = True) int[source]#

Given a string like 1.23M or an integer, prepares the num_steps argument for a single universe simulation.

For string arguments, uses parse_si_multiplier() for string parsing. If that fails, attempts to read it in float notation by calling int(float(N)).

Note

This function always applies integer rounding.

Parameters
  • N (Union[str, int]) – The num_steps argument as a string or integer.

  • raise_if_negative (bool, optional) – Whether to raise an error if the value is negative.

Returns

The parsed value for num_steps

Return type

int

Raises

ValueError – Result invalid, i.e. not parseable or of negative value.

utopya.workermanager module#

The WorkerManager is a central part of utopya in that it spawns and controls the tasks (WorkerTask) that are to be worked on.

utopya.workermanager.STOPCOND_EXIT_CODES: Sequence[int] = (-10, 10, 138)#

Exit codes of a WorkerTask that will be interpreted as stemming from a stop condition. This depends on the signal used for stop conditions (utopya.stop_conditions.SIG_STOPCOND). This sequence of possible exit codes takes into account that the sign may be switched (depending on whether a signed or unsigned integer convention is used) or where a convention is used such that a handled signal is turned into an exit code of 128 + abs(signum).

class utopya.workermanager.WorkerManager(num_workers: typing.Union[int, str] = 'auto', poll_delay: float = 0.05, lines_per_poll: int = 20, periodic_task_callback: typing.Optional[int] = None, QueueCls: type = <class 'queue.Queue'>, reporter: typing.Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: typing.Optional[typing.Dict[str, typing.Union[str, typing.List[str]]]] = None, save_streams_on: typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: typing.Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: typing.Optional[dict] = None)[source]#

Bases: object

The WorkerManager class orchestrates WorkerTask objects: setting them up, invoking them, tracking their progress, and starting new workers if previous workers finished.

Attrs:
rf_spec (dict): The report format specifications that are used

throughout the WorkerManager. These are invoked at different points of the operation of the WorkerManager: while_working, after_work, after_abort, task_spawn, task_finished.

__init__(num_workers: typing.Union[int, str] = 'auto', poll_delay: float = 0.05, lines_per_poll: int = 20, periodic_task_callback: typing.Optional[int] = None, QueueCls: type = <class 'queue.Queue'>, reporter: typing.Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: typing.Optional[typing.Dict[str, typing.Union[str, typing.List[str]]]] = None, save_streams_on: typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: typing.Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: typing.Optional[dict] = None)[source]#

Initialize the worker manager.

Parameters
  • num_workers (Union[int, str], optional) – The number of workers that can work in parallel. If ‘auto’ (default), uses os.cpu_count(). If below zero, deduces abs(num_workers) from the CPU count.

  • poll_delay (float, optional) – How long (in seconds) the delay between worker polls should be. For too small delays (<0.01), the CPU load will become significant.

  • lines_per_poll (int, optional) – How many lines to read from each stream during polling of the tasks. This value should not be too large, otherwise the polling is delayed by too much. By setting it to -1, all available lines are read.

  • periodic_task_callback (int, optional) – If given, an additional task callback will be invoked after every periodic_task_callback poll events.

  • QueueCls (type, optional) – Which class to use for the queue. Defaults to the FiFo queue.Queue.

  • reporter (WorkerManagerReporter, optional) – The reporter associated with this WorkerManager, reporting on the progress.

  • rf_spec (Dict[str, Union[str, List[str]]], optional) –

    The names of report formats that should be invoked at different points of the WorkerManager’s operation. Possible keys: before_working, while_working, after_work, after_abort, task_spawn, task_finished. All other keys are ignored.

    The values of the dict can be either strings or lists of strings, where the strings always refer to report formats registered with the WorkerManagerReporter. This argument updates the default report format specifications.

  • save_streams_on (Sequence[str], optional) – On which events to invoke save_streams() during work. Should be a sequence containing one or both of the keys on_monitor_update, periodic_callback.

  • nonzero_exit_handling (str, optional) – How to react if a WorkerTask exits with a non-zero exit code. For ‘ignore’, nothing happens. For ‘warn’, a warning is printed and the last 5 lines of the log are shown. For ‘raise’, the last 20 lines of the log is shown, all other tasks are terminated, and the WorkerManager exits with the same exit code as the WorkerTask exited with. Note that ‘warn’ will not lead to any messages if the worker died by SIGTERM, which presumable originated from a fulfilled stop condition. Use ‘warn_all’ to also receive warnings in this case.

  • interrupt_params (dict, optional) –

    Parameters that determine how the WorkerManager behaves when receiving KeyboardInterrupts during working. Possible keys:

    send_signal: Which signal to send to the workers. Can

    be SIGINT (default), SIGTERM, SIGKILL, or any valid signal as integer.

    grace_period: how long to wait for the other workers to

    gracefully shut down. After this period (in seconds), the workers will be killed via SIGKILL. Default is 5s.

    exit: whether to sys.exit at the end of start_working.

    Default is True.

  • cluster_mode (bool, optional) – Whether similar tasks to those that are managed by this WorkerManager are, at the same time, worked on by other WorkerManager. This is relevant because the output of files might be affected by whether another WorkerManager instance is currently working on the same output directory. Also, in the future, this argument might be used to communicate between nodes.

  • resolved_cluster_params (dict, optional) – The corresponding cluster parameters.

Raises

ValueError – For too negative num_workers argument

pending_exceptions: queue.Queue = None#

A (FiFo) queue of Exception objects that will be handled by the WorkerManager during working. This is the interface that allows for other threads that have access to the WorkerManager to add an exception and let it be handled in the main thread.

rf_spec: dict = None#

The report format specifications that are used throughout the WorkerManager. These are invoked at different points of the operation:

  • while_working

  • after_work

  • after_abort

  • task_spawn

  • task_finished

times: dict = None#

Holds profiling information

property tasks: utopya.task.TaskList#

The list of all tasks.

property task_queue: queue.Queue#

The task queue.

property task_count: int#

Returns the number of tasks that this manager ever took care of. Careful: This is NOT the current number of tasks in the queue!

property num_workers: int#

The number of workers that may work in parallel

property active_tasks: List[utopya.task.WorkerTask]#

The list of currently active tasks.

Note that this information might not be up-to-date; a process might quit just after the list has been updated.

property num_finished_tasks: int#

The number of finished tasks. Incremented whenever a task leaves the active_tasks list, regardless of its exit status.

property num_free_workers: int#

Returns the number of free workers.

property poll_delay: float#

Returns the delay between two polls

property stop_conditions: Set[utopya.stop_conditions.StopCondition]#

All stop conditions that were ever passed to start_working() during the life time of this WorkerManager.

property nonzero_exit_handling: str#

Behavior upon a worker exiting with a non-zero exit code.

  • with ignore, nothing happens

  • with warn, a warning is printed

  • with raise, the log is shown and the WorkerManager exits with the same exit code as the corresponding WorkerTask exited with.

property reporter: Optional[utopya.reporter.WorkerManagerReporter]#

The associated WorkerManagerReporter or None, if no reporter is set.

property cluster_mode: bool#

Returns whether the WorkerManager is in cluster mode

property resolved_cluster_params: dict#

Returns a copy of the cluster configuration with all parameters resolved (thus making some additional keys available on the top level). It is returned as a deep copy to avoid mutability issues.

add_task(*, TaskCls: type = <class 'utopya.task.WorkerTask'>, **task_kwargs) utopya.task.WorkerTask[source]#

Adds a task to the WorkerManager.

Parameters
  • TaskCls (type, optional) – The WorkerTask-like type to use

  • **task_kwargs – All arguments needed for WorkerTask initialization. See utopya.task.WorkerTask for all valid arguments.

Returns

The created WorkerTask object

Return type

WorkerTask

start_working(*, detach: bool = False, timeout: Optional[float] = None, stop_conditions: Optional[Sequence[utopya.stop_conditions.StopCondition]] = None, post_poll_func: Optional[Callable] = None) None[source]#

Upon call, all enqueued tasks will be worked on sequentially.

Parameters
  • detach (bool, optional) – If False (default), the WorkerManager will block here, as it continuously polls the workers and distributes tasks.

  • timeout (float, optional) – If given, the number of seconds this work session is allowed to take. Workers will be aborted if the number is exceeded. Note that this is not measured in CPU time, but the host systems wall time.

  • stop_conditions (Sequence[StopCondition], optional) – During the run these StopCondition objects will be checked

  • post_poll_func (Callable, optional) – If given, this is called after all workers have been polled. It can be used to perform custom actions during a the polling loop.

Raises
_invoke_report(rf_spec_name: str, *args, **kwargs)[source]#

Helper function to invoke the reporter’s report function

_grab_task() utopya.task.WorkerTask[source]#

Will initiate that a task is gotten from the queue and that it spawns its worker process.

Returns

The WorkerTask grabbed from the queue.

Return type

WorkerTask

Raises

queue.Empty – If the task queue was empty

_poll_workers() None[source]#

Will poll all workers that are in the working list and remove them from that list if they are no longer alive.

_check_stop_conds(stop_conds: Sequence[utopya.stop_conditions.StopCondition]) Set[utopya.task.WorkerTask][source]#

Checks the given stop conditions for the active tasks and compiles a list of tasks that needs to be terminated.

Parameters

stop_conds (Sequence[StopCondition]) – The stop conditions that are to be checked.

Returns

The WorkerTasks whose workers need to be

terminated

Return type

List[WorkerTask]

_invoke_periodic_callbacks()[source]#

Invokes the periodic callback function of each active task.

_signal_workers(tasks: Union[str, List[utopya.task.WorkerTask]], *, signal: Union[str, int]) None[source]#

Send signals to a list of WorkerTasks.

Parameters
  • tasks (Union[str, List[WorkerTask]]) – strings ‘all’ or ‘active’ or a list of WorkerTasks to signal

  • signal (Union[str, int]) – The signal to send

_handle_pending_exceptions() None[source]#

This method handles the list of pending exceptions during working, starting from the one added most recently.

As the WorkerManager occupies the main thread, it is difficult for other threads to signal to the WorkerManager that an exception occurred. The pending_exceptions attribute allows such a handling; child threads can just add an exception object to it and they get handled during working of the WorkerManager.

This method handles the following exception types in a specific manner:

  • WorkerTaskStopConditionFulfilled: never raising or logging

  • WorkerTaskNonZeroExit: raising or logging depending on the value of the nonzero_exit_handling property

Returns

None

Raises

Exception – The exception that was added first to the queue of pending exceptions

utopya.yaml module#

Takes care of the YAML setup for Utopya.

In the module import order, this module needs to be downstream from all modules that implement objects that require a custom YAML representation.

utopya.yaml._scalar_node_to_object(loader, node)[source]#

Attempts to convert the given scalar node to a null (Python None), a bool, an int, or a float object using the corresponding YAML constructor. If those conversions fail, constructs a scalar (which will typically result in a string being returned).

utopya.yaml._expr_constructor(loader, node)[source]#

Custom pyyaml constructor for evaluating strings with simple mathematical expressions.

Supports: +, -, , *, /, e-X, eX, inf, nan

utopya.yaml._func_on_sequence_constructor(loader, node, *, func: Callable)[source]#

Custom yaml constructor that constructs a sequence, passes it to the given function, and returns the result of that call.

Can be used e.g. in conjunction with the any and all functions, evaluating sequences of booleans.

utopya.yaml._parameter_shorthand_constructor(loader, node) utopya.parameter.Parameter[source]#

Constructs a Parameter object from a scalar YAML node using _scalar_node_to_object().

The YAML tag is used as shorthand mode argument to the from_shorthand class method.