utopya package#
The utopya
package provides a simulation management and
evaluation framework with the following components:
A registry framework for models (
model_registry
) and projects (ProjectRegistry
)A configuration manager and simulation runner, the
Multiverse
:Contains a multi-level configuration interface
Parallel simulation execution via
WorkerManager
.
Coupling to the
dantro
data evaluation pipeline, integrated viautopya.eval
:Custom data
groups
andcontainers
A
eval.plotmanager.PlotManager
that takes into account project- or model-specific plot function definitions.
The
model.Model
abstraction which allows convenient interactive work with utopya and registered models.The
testtools.ModelTest
class, containing specializations that make it more convenient to implement model tests using utopya.
Batch simulation running and evaluation via
batch
For a real-world example of how utopya can be integrated, have a look at the
Utopia modelling framework which uses
utopya as its frontend.
For model implementations, the utopya_backend
package can assist in
building Python-based models that use utopya
as a frontend.
Also visit the user manual front page for more information.
Subpackages#
- utopya._yaml_registry package
- utopya.eval package
- Subpackages
- utopya.eval.plots package
- Submodules
- utopya.eval.plots._attractor module
- utopya.eval.plots._graph module
- utopya.eval.plots._mpl module
- utopya.eval.plots._utils module
- utopya.eval.plots.abm module
- utopya.eval.plots.attractor module
- utopya.eval.plots.ca module
- utopya.eval.plots.distributions module
- utopya.eval.plots.graph module
- utopya.eval.plots.snsplot module
- utopya.eval.plots.time_series module
- utopya.eval.plots package
- Submodules
- utopya.eval._plot_func_resolver module
- utopya.eval.containers module
- utopya.eval.data_ops module
- utopya.eval.datamanager module
- utopya.eval.groups module
- utopya.eval.plotcreators module
- utopya.eval.plothelper module
- utopya.eval.plotmanager module
- utopya.eval.transform module
- Subpackages
- utopya.model_registry package
Submodules#
utopya._cluster module#
This module holds functions used in the Multiverse’s cluster mode
- utopya._cluster.parse_node_list(node_list_str: str, *, mode: str, rcps: dict) List[str] [source]#
Parses the node list to a list of node names and checks against the given resolved cluster parameters.
Depending on
mode
, different forms of the node list are parsable. Forcondensed
mode:node042 node[002,004-011,016] m05s[0204,0402,0504] m05s[0204,0402,0504],m08s[0504,0604,0701],m13s0603,m14s[0501-0502]
utopya._import_tools module#
Helper module that contains tools useful for module imports or manipulation
of the system path.
These are not implemented here but in dantro._import_tools
.
Deprecated since version 1.0.1: This module is deprecated. Use dantro._import_tools
or
utopya_backend.tools.import_package_from_dir()
instead.
utopya._signal module#
Implements signalling-related functionality and globally relevant data
- utopya._signal.SIGMAP = {'SIGABRT': 6, 'SIGALRM': 14, 'SIGBUS': 7, 'SIGCHLD': 17, 'SIGCLD': 17, 'SIGCONT': 18, 'SIGFPE': 8, 'SIGHUP': 1, 'SIGILL': 4, 'SIGINT': 2, 'SIGIO': 29, 'SIGIOT': 6, 'SIGKILL': 9, 'SIGPIPE': 13, 'SIGPOLL': 29, 'SIGPROF': 27, 'SIGPWR': 30, 'SIGQUIT': 3, 'SIGRTMAX': 64, 'SIGRTMIN': 34, 'SIGSEGV': 11, 'SIGSTOP': 19, 'SIGSYS': 31, 'SIGTERM': 15, 'SIGTRAP': 5, 'SIGTSTP': 20, 'SIGTTIN': 21, 'SIGTTOU': 22, 'SIGURG': 23, 'SIGUSR1': 10, 'SIGUSR2': 12, 'SIGVTALRM': 26, 'SIGWINCH': 28, 'SIGXCPU': 24, 'SIGXFSZ': 25, 'SIG_BLOCK': 0, 'SIG_DFL': 0, 'SIG_IGN': 1, 'SIG_SETMASK': 2, 'SIG_UNBLOCK': 1}#
A map from signal names to corresponding integer exit codes
utopya._yaml module#
Supplies basic YAML interface, inherited from yayaml
utopya.batch module#
Implements batch running and evaluation of simulations
- utopya.batch._BTM_BASE_CFG_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/latest/utopya/cfg/btm_cfg.yml'#
Base configuration path of the batch task manager
- utopya.batch._BTM_BASE_CFG = {'cluster_mode': False, 'debug': False, 'parallelization_level': 'batch', 'paths': {'note': None, 'out_dir': '~/utopya_output/_batch'}, 'reporter': {'report_formats': {'progress_bar': {'info_fstr': '{total_progress:>5.1f}% ({cnt[finished]} / {cnt[total]})', 'min_report_intv': 0.5, 'num_cols': 'adaptive', 'parser': 'progress_bar', 'show_times': True, 'times_fstr': '| {elapsed} elapsed', 'times_fstr_final': '| finished in {elapsed:} ', 'times_kwargs': {'mode': 'from_start'}, 'write_to': 'stdout_noreturn'}, 'report_file': {'min_num': 4, 'min_report_intv': 10, 'parser': 'report', 'show_individual_runtimes': True, 'task_label_plural': 'tasks', 'task_label_singular': 'task', 'write_to': {'file': {'path': '_report.txt'}}}}}, 'run_kwargs': {'timeout': None}, 'task_defaults': {'eval': {'create_symlinks': True, 'data_manager': {'out_dir_kwargs': {'exist_ok': False}}, 'out_dir': '{task_name:}', 'plot_manager': {'raise_exc': True}, 'priority': None}, 'run': {}}, 'tasks': {'eval': {}, 'run': {}}, 'worker_kwargs': {'forward_raw': True, 'forward_streams': False, 'popen_kwargs': {}, 'remove_ansi': True, 'save_raw': True, 'save_streams': True, 'streams_log_lvl': None}, 'worker_manager': {'interrupt_params': {'exit': False, 'grace_period': 5.0, 'send_signal': 'SIGTERM'}, 'lines_per_poll': 20, 'nonzero_exit_handling': 'warn_all', 'num_workers': 'auto', 'periodic_task_callback': 20, 'poll_delay': 0.05, 'rf_spec': {'after_abort': ['progress_bar', 'report_file'], 'after_work': ['progress_bar', 'report_file'], 'before_working': [], 'monitor_updated': ['progress_bar'], 'task_finished': ['progress_bar', 'report_file'], 'task_spawned': ['progress_bar'], 'while_working': ['progress_bar']}, 'save_streams_on': ['periodic_callback']}}#
Actual base configuration of the batch task manager
- utopya.batch._BTM_USER_DEFAULTS = {}#
User defaults for the batch task manager
- utopya.batch._BTM_DEFAULTS = {'cluster_mode': False, 'debug': False, 'parallelization_level': 'batch', 'paths': {'note': None, 'out_dir': '~/utopya_output/_batch'}, 'reporter': {'report_formats': {'progress_bar': {'info_fstr': '{total_progress:>5.1f}% ({cnt[finished]} / {cnt[total]})', 'min_report_intv': 0.5, 'num_cols': 'adaptive', 'parser': 'progress_bar', 'show_times': True, 'times_fstr': '| {elapsed} elapsed', 'times_fstr_final': '| finished in {elapsed:} ', 'times_kwargs': {'mode': 'from_start'}, 'write_to': 'stdout_noreturn'}, 'report_file': {'min_num': 4, 'min_report_intv': 10, 'parser': 'report', 'show_individual_runtimes': True, 'task_label_plural': 'tasks', 'task_label_singular': 'task', 'write_to': {'file': {'path': '_report.txt'}}}}}, 'run_kwargs': {'timeout': None}, 'task_defaults': {'eval': {'create_symlinks': True, 'data_manager': {'out_dir_kwargs': {'exist_ok': False}}, 'out_dir': '{task_name:}', 'plot_manager': {'raise_exc': True}, 'priority': None}, 'run': {}}, 'tasks': {'eval': {}, 'run': {}}, 'worker_kwargs': {'forward_raw': True, 'forward_streams': False, 'popen_kwargs': {}, 'remove_ansi': True, 'save_raw': True, 'save_streams': True, 'streams_log_lvl': None}, 'worker_manager': {'interrupt_params': {'exit': False, 'grace_period': 5.0, 'send_signal': 'SIGTERM'}, 'lines_per_poll': 20, 'nonzero_exit_handling': 'warn_all', 'num_workers': 'auto', 'periodic_task_callback': 20, 'poll_delay': 0.05, 'rf_spec': {'after_abort': ['progress_bar', 'report_file'], 'after_work': ['progress_bar', 'report_file'], 'before_working': [], 'monitor_updated': ['progress_bar'], 'task_finished': ['progress_bar', 'report_file'], 'task_spawned': ['progress_bar'], 'while_working': ['progress_bar']}, 'save_streams_on': ['periodic_callback']}}#
Aggregated and recursively updated default batch task manager config
- utopya.batch.INVALID_TASK_NAME_CHARS = ('/', ':', '.', '?', '*')#
Substrings that may not appear in task names
- utopya.batch._eval_task(*, task_name: str, _batch_name: str, _batch_dirs: dict, _task_cfg_path: str, _create_symlinks: bool, model_name: str, model_kwargs: dict = {}, use_data_tree_cache: Optional[bool] = None, print_tree: Union[bool, str] = 'condensed', plot_only: Optional[Sequence[str]] = None, plots_cfg: Optional[str] = None, update_plots_cfg: dict = {}, **frozen_mv_kwargs)[source]#
The evaluation task target for the multiprocessing.Process. It sets up a
utopya.model.Model
, loads the data, and performs plots.
- class utopya.batch.BatchTaskManager(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]#
Bases:
object
A manager for batch tasks
- RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#
The time format string for the run directory
- __init__(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]#
Sets up a BatchTaskManager.
- Parameters
batch_cfg_path (str, optional) – The batch file with all the task definitions.
**update_batch_cfg – Additional arguments that are used to update the batch configuration.
- Raises
NotImplementedError – If
run_tasks
orcluster_mode
were set in the batch configuration.
- static _setup_batch_cfg(batch_cfg_path: str, **update_batch_cfg) dict [source]#
Sets up the BatchTaskManager configuration
- _setup_dirs(out_dir: str, note: Optional[str] = None) Tuple[Dict[str, str], str] [source]#
Sets up directories
- _add_tasks(tasks: dict, defaults: dict, add_task: Callable) int [source]#
Adds all configured run tasks to the WorkerManager’s task queue
- _add_eval_task(name: str, *, model_name: str, out_dir: str, enabled: bool = True, priority: Optional[int] = None, create_symlinks: bool = False, **eval_task_kwargs)[source]#
Adds a single evaluation task to the WorkerManager.
- Parameters
name (str) – Name of this task
model_name (str) – Model name; required in task, thus already requiring it here.
out_dir (str) – The path to the data output directory, i.e. the directory where all plots will ned up in. This may be a format string containing any of the following keys:
task_name
,model_name
,timestamp
,batch_name
(combination oftimestamp
and the note). Relative paths are evaluated relative to theeval
batch run directory.enabled (bool, optional) – If False, will not add this task.
priority (int, optional) – Task priority; tasks with smaller value will be picked first.
create_symlinks (bool, optional) – Whether to create symlinks that add crosslinks between related directories, e.g.: from the output directory, link back to the task configuration; from the evaluation output directory alongside the simulation data, link to the batch output directory
**eval_task_kwargs – All further evaluation task arguments.
utopya.cfg module#
Module that coordinates utopya’s persistent config directory
- utopya.cfg.UTOPYA_CFG_DIR = '/home/docs/.config/utopya'#
Path to the persistent utopya configuration directory
- utopya.cfg.UTOPYA_CFG_FILE_NAMES = {'batch': 'batch_cfg.yml', 'user': 'user_cfg.yml', 'utopya': 'utopya_cfg.yml'}#
Names and paths of valid configuration entries
- utopya.cfg.UTOPYA_CFG_FILE_PATHS = {'batch': '/home/docs/.config/utopya/batch_cfg.yml', 'user': '/home/docs/.config/utopya/user_cfg.yml', 'utopya': '/home/docs/.config/utopya/utopya_cfg.yml'}#
Absolute configuration file paths
- utopya.cfg.UTOPYA_CFG_SUBDIR_NAMES = {'models': 'models', 'projects': 'projects'}#
Names and paths of valid configuration subdirectories
- utopya.cfg.UTOPYA_CFG_SUBDIRS = {'models': '/home/docs/.config/utopya/models', 'projects': '/home/docs/.config/utopya/projects'}#
Absolute configuration file paths
- utopya.cfg.PROJECT_INFO_FILE_SEARCH_PATHS = ('.utopya_project.yml', '.utopya-project.yml')#
Potential names of project info files, relative to base directory
- utopya.cfg.get_cfg_path(cfg_name: str) str [source]#
Returns the absolute path to the specified configuration file
- utopya.cfg.load_from_cfg_dir(cfg_name: str) dict [source]#
Load a configuration file; returns empty dict if no file exists.
utopya.exceptions module#
utopya-specific exception types
- exception utopya.exceptions.UtopyaException[source]#
Bases:
BaseException
Base class for utopya-specific exceptions
- exception utopya.exceptions.ValidationError[source]#
Bases:
utopya.exceptions.UtopyaException
,ValueError
Raised upon failure to validate a parameter
- exception utopya.exceptions.WorkerManagerError[source]#
Bases:
utopya.exceptions.UtopyaException
The base exception class for WorkerManager errors
- exception utopya.exceptions.WorkerManagerTotalTimeout[source]#
Bases:
utopya.exceptions.WorkerManagerError
Raised when a total timeout occurred
- exception utopya.exceptions.WorkerTaskError[source]#
Bases:
utopya.exceptions.WorkerManagerError
Raised when there was an error in a WorkerTask
- exception utopya.exceptions.WorkerTaskNonZeroExit(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Bases:
utopya.exceptions.WorkerTaskError
Can be raised when a WorkerTask exited with a non-zero exit code.
- __init__(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Initialize an error handling non-zero exit codes from workers
- exception utopya.exceptions.WorkerTaskStopConditionFulfilled(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Bases:
utopya.exceptions.WorkerTaskNonZeroExit
An exception that is raised when a worker-specific stop condition was fulfilled. This allows being handled separately to other non-zero exits.
- exception utopya.exceptions.YAMLRegistryError[source]#
Bases:
utopya.exceptions.UtopyaException
,ValueError
Base class for errors in YAMLRegistry
- exception utopya.exceptions.EntryExistsError[source]#
Bases:
utopya.exceptions.YAMLRegistryError
Raised if an entry already exists
- exception utopya.exceptions.MissingEntryError[source]#
Bases:
utopya.exceptions.YAMLRegistryError
Raised if an entry is missing
- exception utopya.exceptions.MissingRegistryError[source]#
Bases:
utopya.exceptions.YAMLRegistryError
Raised if a registry is missing
- exception utopya.exceptions.EntryValidationError[source]#
Bases:
utopya.exceptions.YAMLRegistryError
Raised upon failed validation of a registry entry
- exception utopya.exceptions.SchemaValidationError[source]#
Bases:
utopya.exceptions.YAMLRegistryError
If schema validation failed
- exception utopya.exceptions.ModelRegistryError[source]#
Bases:
utopya.exceptions.UtopyaException
,ValueError
Raised on errors with model registry
- exception utopya.exceptions.MissingModelError[source]#
Bases:
utopya.exceptions.ModelRegistryError
Raised when a model is missing
- exception utopya.exceptions.BundleExistsError[source]#
Bases:
utopya.exceptions.ModelRegistryError
Raised when a bundle that compared equal already exists
- exception utopya.exceptions.MissingBundleError[source]#
Bases:
utopya.exceptions.ModelRegistryError
Raised when a bundle is missing
- exception utopya.exceptions.BundleValidationError[source]#
Bases:
utopya.exceptions.ModelRegistryError
Raised when the result of validating the existence of a bundle fails
- exception utopya.exceptions.ProjectRegistryError[source]#
Bases:
utopya.exceptions.UtopyaException
,ValueError
Raised on errors with project registry
- exception utopya.exceptions.MissingProjectError[source]#
Bases:
utopya.exceptions.ProjectRegistryError
Raised on a missing project
- exception utopya.exceptions.ProjectExistsError[source]#
Bases:
utopya.exceptions.ProjectRegistryError
Raised if a project or project file of that name already exists
utopya.model module#
Provides the Model
to work interactively with registered
utopya models
- class utopya.model.Model(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, bundle_label: Optional[str] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]#
Bases:
object
A class to work with Utopia models interactively.
It attaches to a certain model and makes it easy to load config files, create a Multiverse from them, run it, and work with it further…
- CONFIG_SET_MODEL_SOURCE_SUBDIRS = ('cfgs', 'cfg_sets', 'config_sets')#
Directories within the model source directories to search through when looking for configuration sets. These are not used if the utopya config contains an entry overwriting this.
- __init__(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, bundle_label: Optional[str] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]#
Initialize the ModelTest for the given model name
- Parameters
name (str, optional) – Name of the model to attach to. If not given, need to pass info_bundle.
info_bundle (ModelInfoBundle, optional) – The required information to work with this model. If not given, will attempt to find the model in the model registry via
name
orbundle_label
.bundle_label (str, optional) – A label to use for identifying the info bundle.
base_dir (str, optional) – For convenience, can specify this path which will be seen as the base path for config files; if set, arguments that allow specifying configuration files can specify them relative to this directory.
sim_errors (str, optional) – Whether to raise errors from Multiverse
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For
false
, the regular model output directory is used.
- Raises
ValueError – Upon bad
base_dir
- property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle#
The model info bundle
- property name: str#
The name of this Model object, which is at the same time the name of the attached model.
- property base_dir: str#
Returns the path to the base directory, if set during init.
This is the path to a directory from which config files can be loaded using relative paths.
- property default_model_cfg: dict#
Returns the default model configuration by loading it from the file specified in the info bundle.
- property default_config_set_search_dirs: List[str]#
Returns the default config set search directories for this model in the order of precedence:
defined on the project-level via
cfg_set_abs_search_dirs
; these may also be format strings supporting the following set of keys:model_name
,project_base_dir
, andmodel_source_dir
(if set). If no project is associated, there will be no additional search directories.names of subdirectories relative to the model source directory, defined in
cfg_set_model_source_subdirs
. If no model source directory is known, no search directories will be added. If no project is associated, a standard set of search directories is used:cfgs
,cfg_sets
,config_sets
, as defined inCONFIG_SET_MODEL_SOURCE_SUBDIRS
.
Note
The output may contain relative paths.
- property default_config_sets: Dict[str, dict]#
Config sets at the default search locations.
To retrieve an individual config set, consider using
get_config_set()
instead of this property.For more information, see Configuration Sets.
- create_mv(*, from_cfg: Optional[str] = None, from_cfg_set: Optional[str] = None, run_cfg_path: Optional[str] = None, use_tmpdir: Optional[bool] = None, **update_meta_cfg) utopya.multiverse.Multiverse [source]#
Creates a
utopya.multiverse.Multiverse
for this model, optionally loading a configuration from a file and updating it with further keys.- Parameters
from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.
from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with
from_cfg
andrun_cfg_path
.run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if
from_cfg
orfrom_cfg_set
arguments were given.use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.
**update_meta_cfg – Can be used to update the meta configuration
- Returns
The created Multiverse object
- Return type
- Raises
ValueError – If more than one of the run config selecting arguments (
from_cfg
,from_cfg_set
,run_cfg_path
) were given.
- create_frozen_mv(**fmv_kwargs) utopya.multiverse.FrozenMultiverse [source]#
Create a
utopya.multiverse.FrozenMultiverse
, coupling it to a run directory.Use this method if you want to load an existing simulation run.
- Parameters
**fmv_kwargs – Passed on to
utopya.multiverse.FrozenMultiverse.__init__()
- create_run_load(*, from_cfg: Optional[str] = None, run_cfg_path: Optional[str] = None, from_cfg_set: Optional[str] = None, use_tmpdir: Optional[bool] = None, print_tree: bool = True, **update_meta_cfg) Tuple[utopya.multiverse.Multiverse, utopya.eval.datamanager.DataManager] [source]#
Chains the
create_mv()
,mv.run
, andmv.dm.load_from_cfg
methods calls together and returns a(Multiverse, DataManager)
tuple.- Parameters
from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.
from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with
from_cfg
andrun_cfg_path
.run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if
from_cfg
orfrom_cfg_set
arguments were given.use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.
print_tree (bool, optional) – Whether to print the loaded data tree
**update_meta_cfg – Arguments passed to the create_mv function
- Returns
- The created Multiverse and the
corresponding DataManager (with data already loaded).
- Return type
Tuple[Multiverse, DataManager]
- get_config_set(name: Optional[str] = None) Dict[str, str] [source]#
Returns a configuration set: a dict containing paths to run and/or eval configuration files. These are accessible via the keys
run
andeval
.Config sets are retrieved from multiple locations:
The
cfgs
directory in the model’s source directoryThe user-specified lookup directories, specified in the utopya configuration as
config_set_search_dirs
If
name
is an absolute or relative path, and a directory exists at the specified location, the parent directory is interpreted as a search path.
This uses
get_config_sets()
to retrieve all available configuration sets from the above paths and then selects the one with the givenname
. Config sets that are found later overwrite those with the same name found in previous searches and log a warning message (which can be controlled with thewarn
argument); sets are not merged.For more information, see Configuration Sets.
- Parameters
name (str, optional) – The name of the config set to retrieve. This may also be a local path, which is looked up prior to the default search directories.
- get_config_sets(*, search_dirs: Optional[List[str]] = None, warn: bool = True, cfg_sets: Optional[dict] = None) Dict[str, dict] [source]#
Searches for all available configuration sets in the given search directories, aggregating them into one dict.
The search is done in reverse order of the paths given in
search_dirs
, i.e. starting from those directories with the lowest precedence. If configuration sets with the same name are encountered, warnings are emitted, but the one with higher precedence (appearing more towards the front ofsearch_dirs
, i.e. the later-searched one) will take precedence.Note
This will not merge configuration sets from different search directories, e.g. if one contained only an eval configuration and the other contained only a run configuration, a warning will be emitted but the entry from the later-searched directory will be used.
- Parameters
search_dirs (List[str], optional) – The directories to search sequentially for config sets. If not given, will use the default config set search directories, see
default_config_set_search_dirs
.warn (bool, optional) – Whether to warn (via log message), if the search yields a config set with a name that already existed.
cfg_sets (dict, optional) – If given, aggregate newly found config sets into this dict. Otherwise, start with an empty one.
- _store_mv(mv: utopya.multiverse.Multiverse, **kwargs) None [source]#
Stores a created Multiverse object and all the kwargs in a dict
- _create_tmpdir() tempfile.TemporaryDirectory [source]#
Create a TemporaryDirectory
utopya.multiverse module#
Implementation of the Multiverse
class which
sits at the heart of utopya and supplies the main user interface for the
frontend. It allows to run a simulation and then evaluate it.
- class utopya.multiverse.Multiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]#
Bases:
object
The Multiverse is where a single simulation run is orchestrated from.
It spawns multiple universes, each of which represents a single simulation of the selected model with the parameters specified by the meta configuration. The
WorkerManager
takes care to perform these simulations in parallel.The
Multiverse
then interfaces with thedantro
data processing pipeline using classes specialized inutopya.eval
: TheDataManager
loads the created simulation output, making it available in a uniformly accessible and hierarchical data tree. Then, thePlotManager
handles plotting of that data.- BASE_META_CFG_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/latest/utopya/cfg/base_cfg.yml'#
Where the default meta configuration can be found
- USER_CFG_SEARCH_PATH = '/home/docs/.config/utopya/user_cfg.yml'#
Where to look for the user configuration
- RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#
The time format string for the run directory
- UTOPYA_BASE_PLOTS_PATH = '/home/docs/checkouts/readthedocs.org/user_builds/utopya/checkouts/latest/utopya/cfg/base_plots.yml'#
Where the utopya base plots configuration can be found; this is passed to the
PlotManager
.
- __init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]#
Initialize the Multiverse.
- Parameters
model_name (str, optional) – The name of the model to run
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the executable path etc. If not given, will attempt to read it from the model registry.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
_shared_worker_manager (WorkerManager, optional) –
If given, this already existing WorkerManager instance (and its reporter) will be used instead of initializing new instances.
Warning
This argument is only exposed for internal purposes. It should not be used for production code and behavior of this argument may change at any time.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle#
The model info bundle for this Multiverse
- property model: utopya.model.Model#
A model instance, created ad-hoc using the associated info bundle
- property resolved_cluster_params: dict#
Returns a copy of the cluster configuration with all parameters resolved. This makes some additional keys available on the top level.
- property dm: utopya.eval.datamanager.DataManager#
The Multiverse’s DataManager.
- property wm: utopya.workermanager.WorkerManager#
The Multiverse’s WorkerManager.
- property pm: utopya.eval.plotmanager.PlotManager#
The Multiverse’s PlotManager.
- run(*, sweep: Optional[bool] = None)[source]#
Starts a Utopia simulation run.
Specifically, this method adds simulation tasks to the associated WorkerManager, locks its task list, and then invokes the
start_working()
method which performs all the simulation tasks.If cluster mode is enabled, this will split up the parameter space into (ideally) equally sized parts and only run one of these parts, depending on the cluster node this Multiverse is being invoked on.
Note
As this method locks the task list of the
WorkerManager
, no further tasks can be added henceforth. This means, that each Multiverse instance can only perform a single simulation run.- Parameters
sweep (bool, optional) – Whether to perform a sweep or not. If None, the value will be read from the
perform_sweep
key of the meta-configuration.
- run_single()[source]#
Runs a single simulation using the parameter space’s default value.
See
run()
for more information.
- renew_plot_manager(**update_kwargs)[source]#
Tries to set up a new PlotManager. If this succeeds, the old one is discarded and the new one is associated with this Multiverse.
- Parameters
**update_kwargs – Passed on to PlotManager.__init__
- _create_meta_cfg(*, run_cfg_path: str, user_cfg_path: str, update_meta_cfg: dict) dict [source]#
Create the meta configuration from several parts and store it.
The final configuration dict is built from multiple components, where one is always recursively updating the previous level. The resulting configuration is the meta configuration and is stored to the
meta_cfg
attribute.The parts are recorded in the
cfg_parts
dict and returned such that a backup can be created.- Parameters
- Returns
- dict of the parts that were needed to create the meta config.
The dict-key corresponds to the part name, the value is the payload which can be either a path to a cfg file or a dict
- Return type
- _apply_debug_level(lvl: Optional[int] = None)[source]#
Depending on the debug level, applies certain settings to the Multiverse and the runtime environment.
Note
This does not (yet) set the corresponding debug flags for the
PlotManager
,DataManager
, orWorkerManager
!
- _create_run_dir(*, out_dir: str, model_note: Optional[str] = None) None [source]#
Create the folder structure for the run output.
For the chosen model name and current timestamp, the run directory will be of form <timestamp>_<model_note> and be part of the following directory tree:
utopya_output model_a 180301-125410_my_model_note config data uni000 uni001 ... eval model_b 180301-125412_my_first_sim 180301-125413_my_second_sim
If running in cluster mode, the cluster parameters are resolved and used to determine the name of the simulation. The pattern then does not include a timestamp as each node might return not quite the same value. Instead, a value from an environment variable is used. The resulting path can have different forms, depending on which environment variables were present; required parts are denoted by a
*
in the following pattern; if the value of the other entries is not available, the connecting underscore will not be used:{timestamp}_{job id*}_{cluster}_{job account}_{job name}_{note}
- Parameters
- Raises
RuntimeError – If the simulation directory already existed. This should not occur, as the timestamp is unique. If it occurs, you either started two simulations very close to each other or something is seriously wrong. Strange time zone perhaps?
- _setup_pm(**update_kwargs) utopya.eval.plotmanager.PlotManager [source]#
Helper function to setup a PlotManager instance
- _parse_base_cfg_pools(base_cfg_pools: List[Union[str, Tuple[str, Union[dict, str]]]]) List[Tuple[str, Union[dict, str]]] [source]#
Prepares the
base_cfg_pools
argument to be valid input to the PlotManager. This method resolves format strings and thus allows to more generically define base config pools.Possible formats for each entry of
base_cfg_pools
argument are:A 2-tuple
(name, pool dict)
which specifies the name of the base config pool alongside with the pool entries.A 2-tuple
(name, path to pool config file)
, which is later loaded by the PlotManagerA shortcut key which resolves to the corresponding 2-tuple. Available shortcuts are:
utopya_base
,framework_base
,project_base
, andmodel_base
.
Both the pool name and path may be format strings which get resolved with the
model_name
key and (in the case of the path) the fullpaths
dict of the current model’s info bundle. A format string may look like this:“{paths[source_dir]}/{model_name}_more_plots.yml” “~/some/more/plots/{model_name}/plots.yml”
If such a path cannot be resolved, an error is logged and an empty pool is used instead; this allows for more flexibility in defining locations for additional config pools.
- _perform_backup(*, cfg_parts: dict, backup_cfg_files: bool = True, backup_executable: bool = False, include_git_info: bool = True) None [source]#
Performs a backup of that information that can be used to recreate a simulation.
The configuration files are backed up into the
config
subdirectory of the run directory. All other relevant information is stored in an additionally createdbackup
subdirectory.Warning
These backups are created prior to the start of the actual simulation run and contains information known at that point. Any changes to the meta configuration made after initialization of the Multiverse will not be reflected in these backups.
In particular, the
perform_sweep
andparameter_space
entries of the meta configuration may not reflect which form of parameter space iteration was actually performed, because therun_single
andrun_sweep
methods overwrite this behavior. To that end, that information is separately stored once therun
methods are invoked.- Parameters
cfg_parts (dict) – A dict of either paths to configuration files or dict-like data that is to be dumped into a configuration file.
backup_cfg_files (bool, optional) – Whether to backup the individual configuration files (i.e. the
cfg_parts
information). If false, the meta configuration will still be backed up.backup_executable (bool, optional) – Whether to backup the executable. Note that these files can sometimes be quite large.
include_git_info (bool, optional) – If True, will store information about the state of the project’s (and framework’s, if existent) git repository.
- _perform_pspace_backup(pspace: paramspace.paramspace.ParamSpace, *, filename: str = 'parameter_space', **info_kwargs)[source]#
Stores the given parameter space and its metadata into the
config
directory. Two files will be produced:config/{filename}.yml
: the passedpspace
objectconfig/{filename}_info.yml
: the passedpspace
object’sinfo dictionary containing relevant metadata (and the additionally passed
info_kwargs
)
Note
This method is separated from the regular backup method
_perform_backup()
because the parameter space that is used during a simulation run may be a lower-dimensional version of the one the Multiverse was initialized with. To that end,run()
will invoke this backup function again once the relevant information is fully determined. This is important because it is needed to communicate the correct information about the sweep to objects downstream in the pipeline (e.g.MultiversePlotCreator
).- Parameters
pspace (paramspace.paramspace.ParamSpace) – The ParamSpace object to save as backup.
filename (str, optional) – The filename (without extension!) to use. (This is also used for the log message, with underscores changed to spaces.)
**info_kwargs – Additional kwargs that are to be stored in the meta- data dict.
- _prepare_executable(*, run_from_tmpdir: bool = False) None [source]#
Prepares the model executable, potentially copying it to a temporary location.
Note that
run_from_tmpdir
requires the executable to be relocatable to another location, i.e. be position-independent.- Parameters
run_from_tmpdir (bool, optional) – Whether to copy the executable to a temporary directory that goes out of scope once the Multiverse instance goes out of scope.
- Raises
FileNotFoundError – On missing file at model binary path
PermissionError – On wrong access rights of file at the binary path
- _resolve_cluster_params() dict [source]#
This resolves the cluster parameters, e.g. by setting parameters depending on certain environment variables. This function is called by the resolved_cluster_params property.
- Returns
The resolved cluster configuration parameters
- Return type
- Raises
ValueError – If a required environment variable was missing or empty
- _add_sim_task(*, uni_id_str: str, uni_cfg: dict, is_sweep: bool) None [source]#
Helper function that handles task assignment to the WorkerManager.
This function creates a WorkerTask that will perform the following actions once it is grabbed and worked at:
Create a universe (folder) for the task (simulation)
Write that universe’s configuration to a yaml file in that folder
Create the correct arguments for the call to the model binary
To that end, this method gathers all necessary arguments and registers a WorkerTask with the WorkerManager.
- Parameters
uni_id_str (str) – The zero-padded uni id string
uni_cfg (dict) – given by ParamSpace. Defines how many simulations should be started
is_sweep (bool) – Flag is needed to distinguish between sweeps and single simulations. With this information, the forwarding of a simulation’s output stream can be controlled.
- Raises
RuntimeError – If adding the simulation task failed
- _add_sim_tasks(*, sweep: Optional[bool] = None) int [source]#
Adds the simulation tasks needed for a single run or for a sweep.
- Parameters
sweep (bool, optional) – Whether tasks for a parameter sweep should be added or only for a single universe. If None, will read the
perform_sweep
key from the meta-configuration.- Returns
The number of added tasks.
- Return type
- Raises
ValueError – On
sweep == True
and zero-volume parameter space.
- _validate_meta_cfg() bool [source]#
Goes through the parameters that require validation, validates them, and creates a useful error message if there were invalid parameters.
- Returns
- True if all parameters are valid; None if no check was done.
Note that False will never be returned, but a ValidationError will be raised instead.
- Return type
- Raises
ValidationError – If validation failed.
- class utopya.multiverse.FrozenMultiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#
Bases:
utopya.multiverse.Multiverse
A frozen Multiverse is like a Multiverse, but frozen.
It is initialized from a finished
Multiverse
run and re-creates all the attributes from that data, e.g.: the meta configuration, a DataManager, and a PlotManager.Note
A frozen multiverse is no longer able to perform any simulations.
- __init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#
Initializes the FrozenMultiverse from a model name and the name of a run directory.
Note that this also takes arguments to specify the run configuration to use.
- Parameters
model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.
run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
use_meta_cfg_from_run_dir (bool, optional) – If True, will load the meta configuration from the given run directory; only works for absolute run directories.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- _create_run_dir(*, out_dir: str, run_dir: str, **__)[source]#
Helper function to find the run directory from arguments given to
__init__()
.Overwrites the method from the parent Multiverse class, because the FrozenMultiverse does not require setting up a new run directory but should instead identify the existing one and create an appropriate output directory.
utopya.parameter module#
This module implements the Parameter
class
which is used when validating model and simulation parameters.
- class utopya.parameter.Parameter(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Optional[float], Optional[float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]#
Bases:
object
The parameter class is used when a model parameter needs to be validated before commencing the model run. It can hold information on the parameter itself as well as its valid range and type and other meta-data.
Per default, the
Parameter
class should be assumed to handle scalar parameters like numerical values or strings. For validating sequence-like parameters, corresponding specializing classes are to be implemeted.- SHORTHAND_MODES: Dict[str, Callable] = {'is-bool': <function Parameter.<lambda>>, 'is-in-unit-interval': <function Parameter.<lambda>>, 'is-int': <function Parameter.<lambda>>, 'is-negative': <function Parameter.<lambda>>, 'is-negative-int': <function Parameter.<lambda>>, 'is-negative-or-zero': <function Parameter.<lambda>>, 'is-positive': <function Parameter.<lambda>>, 'is-positive-int': <function Parameter.<lambda>>, 'is-positive-or-zero': <function Parameter.<lambda>>, 'is-probability': <function Parameter.<lambda>>, 'is-string': <function Parameter.<lambda>>, 'is-unsigned': <function Parameter.<lambda>>}#
Shorthand mode factory functions. These are used in the
from_shorthand()
class method to generate aParameter
object more easily.Also,
utopya.yaml
registers each of these shorthand modes as a YAML constructor for tag!<mode>
.
- LIMIT_COMPS: Dict[str, Callable] = {'(': <built-in function gt>, ')': <built-in function lt>, '[': <built-in function ge>, ']': <built-in function le>}#
Comparators for the
limits
check, depending onlimits_mode
- __init__(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Optional[float], Optional[float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]#
Creates a new Parameter object, which holds a default value as well as some constraints on the possible values this parameter can assume.
- Parameters
default (Any) – the default value of the parameter.
name (str, optional) – the name of the parameter.
description (str, optional) – a description of this parameter or its effects.
is_any_of (Sequence[Any], optional) – a sequence of possible values this parameter can assume. If this parameter is given,
limits
cannot be used.limits (Tuple[Optional[float], Optional[float]], optional) – the upper and lower bounds of the parameter (only applicable to scalar numerals). If None, the bound is assumed to be negative or positive infinity, respectively. Whether boundary values are included into the interval is controlled by the
limits_mode
argument. This argument is mutually exclusive withis_any_of
!limits_mode (str, optional) – whether to interpret the limits as an open, closed, or semi-closed interval. Possible values:
'[]'
(closed, default),'()'
(open),'[)'
, and'(]'
.dtype (Union[str, type], optional) – expected data type of this parameter. Accepts all strings that are accepted by
numpy.dtype
, eg.int
,float
,uint16
,string
.
- Raises
TypeError – On a
limits
argument that was not tuple-like or if alimits
argument was given but thedefault
was aValueError – if an invalid
limits_mode
is passed, iflimits
andis_any_of
are both passed, or if thelimits
argument did not have length 2.
- property default#
The default value for this parameter
- property name#
The name of this parameter
- property description#
The description of this parameter
- property is_any_of: Tuple[Any]#
Possible values this parameter may assume
- property dtype: Optional[numpy.dtype]#
The expected data type of this parameter
- validate(value: Any, *, raise_exc: bool = True) bool [source]#
Checks whether the given value would be a valid parameter.
The checks for the corresponding arguments are carried out in the following order:
is_any_of
dtype
limits
The data type is checked according to the numpy type hierarchy, see docs. To reduce strictness, the following additional compatibilities are taken into account:
for unsigned integer
dtype
, a signed integer-typevalue
is compatible ifvalue >= 0
for floating-point
dtype
, integer-typevalue
are always considered compatiblefor floating-point
dtype
,value
of all floating-point- types are considered compatible, even if they have a lower precision (note the coercion test below, though)
Additionally, it is checked whether
value
is representable as the target data type. This is done by coercingvalue
todtype
and then checking for equality (using np.isclose).- Parameters
value (Any) – The value to test.
raise_exc (bool, optional) – Whether to raise an exception or not.
- Returns
Whether or not the given value is a valid parameter.
- Return type
- Raises
ValidationError – If validation failed or is impossible (for instance due to ambiguous validity parameters). This error message contains further information on why validation failed.
- classmethod from_shorthand(default: Any, *, mode: str, **kwargs)[source]#
Constructs a Parameter object from a given shorthand mode.
- Parameters
default (Any) – the default value for the parameter
mode (str) – A valid shorthand mode, see
SHORTHAND_MODES
**kwargs – any further arguments for Parameter ininitialization, see
__init__()
.
- Returns
a Parameter object
- utopya.parameter.extract_validation_objects(model_cfg: dict, *, model_name: str) Tuple[dict, dict] [source]#
Extracts all
Parameter
objects from a model configuration (a nested dict), replacing them with their default values. Returns both the modified model configuration well as the Parameter objects (keyed by the key sequence necessary to reach them within the model configuration).- Parameters
- Returns
- a tuple of (model config, parameters to validate).
The model config contains the passed config dict in which all Parameter class elements have been replaced by their default entries. The second entry is a dictionary consisting of the Parameter class objects (requiring validation) with keys being key sequences to those Parameter objects. Note that the key sequence is relative to the level above the model configuration, with
model_name
as a common entry for all returned values.
- Return type
utopya.plotting module#
DEPRECATED module that provides backwards-compatibility for the old utopya module structure.
Deprecated since version 1.0.0: This module will be removed soon, please use utopya.eval
instead.
utopya.project_registry module#
Implementation of the utopya project registry
- class utopya.project_registry.ProjectPaths(*, base_dir: pathlib.Path, project_info: Optional[pathlib.Path] = None, models_dir: Optional[pathlib.Path] = None, py_tests_dir: Optional[pathlib.Path] = None, py_plots_dir: Optional[pathlib.Path] = None, mv_project_cfg: Optional[pathlib.Path] = None, project_base_plots: Optional[pathlib.Path] = None)[source]#
Bases:
utopya._yaml_registry.entry.BaseSchema
Schema to use for a project’s
paths
field- base_dir: pathlib.Path#
- project_info: Optional[pathlib.Path]#
- models_dir: Optional[pathlib.Path]#
- py_tests_dir: Optional[pathlib.Path]#
- py_plots_dir: Optional[pathlib.Path]#
- mv_project_cfg: Optional[pathlib.Path]#
- project_base_plots: Optional[pathlib.Path]#
- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'base_dir': FieldInfo(annotation=Path, required=True, metadata=[PathType(path_type='dir')]), 'models_dir': FieldInfo(annotation=Union[Annotated[Path, PathType], NoneType], required=False), 'mv_project_cfg': FieldInfo(annotation=Union[Annotated[Path, PathType], NoneType], required=False), 'project_base_plots': FieldInfo(annotation=Union[Annotated[Path, PathType], NoneType], required=False), 'project_info': FieldInfo(annotation=Union[Annotated[Path, PathType], NoneType], required=False), 'py_plots_dir': FieldInfo(annotation=Union[Annotated[Path, PathType], NoneType], required=False), 'py_tests_dir': FieldInfo(annotation=Union[Annotated[Path, PathType], NoneType], required=False)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class utopya.project_registry.ProjectMetadata(*, version: Optional[str] = None, long_name: Optional[str] = None, description: Optional[str] = None, long_description: Optional[str] = None, license: Optional[str] = None, authors: Optional[List[str]] = None, email: Optional[str] = None, website: Optional[str] = None, utopya_compatibility: Optional[str] = None, language: Optional[str] = None, requirements: Optional[List[str]] = None, misc: Optional[Dict[str, Any]] = None)[source]#
Bases:
utopya._yaml_registry.entry.BaseSchema
Schema to use for a project’s
metadata
field- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'authors': FieldInfo(annotation=Union[List[str], NoneType], required=False), 'description': FieldInfo(annotation=Union[str, NoneType], required=False), 'email': FieldInfo(annotation=Union[str, NoneType], required=False), 'language': FieldInfo(annotation=Union[str, NoneType], required=False), 'license': FieldInfo(annotation=Union[str, NoneType], required=False), 'long_description': FieldInfo(annotation=Union[str, NoneType], required=False), 'long_name': FieldInfo(annotation=Union[str, NoneType], required=False), 'misc': FieldInfo(annotation=Union[Dict[str, Any], NoneType], required=False), 'requirements': FieldInfo(annotation=Union[List[str], NoneType], required=False), 'utopya_compatibility': FieldInfo(annotation=Union[str, NoneType], required=False), 'version': FieldInfo(annotation=Union[str, NoneType], required=False), 'website': FieldInfo(annotation=Union[str, NoneType], required=False)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class utopya.project_registry.ProjectSettings(*, preload_project_py_plots: Optional[bool] = None, preload_framework_py_plots: Optional[bool] = None)[source]#
Bases:
utopya._yaml_registry.entry.BaseSchema
Schema to use for a project’s
settings
field- preload_project_py_plots: Optional[bool]#
Whether to preload the project-level plot module (
py_plots_dir
) after initialization of thePlotManager
. If not given, will load the module.
- preload_framework_py_plots: Optional[bool]#
Whether to preload the framework-level plot module (
py_plots_dir
) after initialization of thePlotManager
. If not given, will load the module.
- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'preload_framework_py_plots': FieldInfo(annotation=Union[bool, NoneType], required=False), 'preload_project_py_plots': FieldInfo(annotation=Union[bool, NoneType], required=False)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class utopya.project_registry.ProjectSchema(*, project_name: str, framework_name: Optional[str] = None, paths: utopya.project_registry.ProjectPaths, metadata: utopya.project_registry.ProjectMetadata, settings: utopya.project_registry.ProjectSettings = {}, run_cfg_format: str = 'yaml', cfg_set_abs_search_dirs: Optional[List[str]] = None, cfg_set_model_source_subdirs: Optional[List[str]] = None, custom_py_modules: Optional[Dict[str, pathlib.Path]] = None, output_files: Optional[dict] = None, debug_level_updates: Optional[Dict[str, dict]] = None)[source]#
Bases:
utopya._yaml_registry.entry.BaseSchema
The data model for a project registry entry
- metadata: utopya.project_registry.ProjectMetadata#
- settings: utopya.project_registry.ProjectSettings#
- custom_py_modules: Optional[Dict[str, pathlib.Path]]#
- _abc_impl = <_abc._abc_data object>#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True, 'validate_default': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'cfg_set_abs_search_dirs': FieldInfo(annotation=Union[List[str], NoneType], required=False), 'cfg_set_model_source_subdirs': FieldInfo(annotation=Union[List[str], NoneType], required=False), 'custom_py_modules': FieldInfo(annotation=Union[Dict[str, Annotated[Path, PathType]], NoneType], required=False), 'debug_level_updates': FieldInfo(annotation=Union[Dict[str, dict], NoneType], required=False), 'framework_name': FieldInfo(annotation=Union[str, NoneType], required=False), 'metadata': FieldInfo(annotation=ProjectMetadata, required=True), 'output_files': FieldInfo(annotation=Union[dict, NoneType], required=False), 'paths': FieldInfo(annotation=ProjectPaths, required=True), 'project_name': FieldInfo(annotation=str, required=True), 'run_cfg_format': FieldInfo(annotation=str, required=False, default='yaml'), 'settings': FieldInfo(annotation=ProjectSettings, required=False, default={})}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class utopya.project_registry.Project(name: str, *, registry: YAMLRegistry = None, **data)[source]#
Bases:
utopya._yaml_registry.entry.RegistryEntry
A registry entry that describes a project
- SCHEMA#
- property framework_project: Optional[utopya.project_registry.Project]#
If a framework project is defined, retrieve it from the registry
- get_git_info(*, include_patch_info: bool = False) dict [source]#
Returns information about the state of this project’s git repository using the
python-git-info
package.If no git information is retrievable, e.g. because the project’s
base_dir
does not contain a git repository, will still return a dict but withhave_git_info
entry set to False.Otherwise the git information will be in the
latest_commit
entry.- Parameters
include_patch_info (bool, optional) – If True, will attempt a subprocess call to
git
and store patch information alongside in thediff
entry. In that case, thedirty
entry will denote whether there were uncommitted changes.- Returns
A dict containing information about the associated git repo.
- Return type
- class utopya.project_registry.ProjectRegistry(registry_dir: Optional[str] = None)[source]#
Bases:
utopya._yaml_registry.registry.YAMLRegistry
The project registry
- __init__(registry_dir: Optional[str] = None)[source]#
Initializes the project registry, loading available entries from the registry directory in the utopya config directory.
This also creates the
projects
directory, if not created yet.- Parameters
registry_dir (str, optional) – A custom projects
- register(*, base_dir: str, info_file: Optional[str] = None, custom_project_name: Optional[str] = None, require_matching_names: Optional[bool] = None, exists_action: str = 'raise') utopya.project_registry.Project [source]#
Register or update information of a project.
- Parameters
base_dir (str) – Project base directory
info_file (str, optional) – Path to info file which contains further path information and metadata (may be relative to base directory). If not given, will use some defaults to search for it.
custom_project_name (str, optional) – Custom project name, overwrites the one given in the info file
require_matching_names (bool, optional) – If set, will require that the custom project name is equal to the one given in the project info file. This allows checking that the file content does not diverge from some outside state.
exists_action (str, optional) – Action to take upon existing project
- Returns
Project information for the new or validated project
- Return type
- utopya.project_registry.PROJECTS = <utopya.project_registry.ProjectRegistry object>#
The package-wide project registry
utopya.reporter module#
Implementation of the reporter framework which can be used to report on the progress or result of operations within utopya.
- class utopya.reporter.ReportFormat(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]#
Bases:
object
A report format aggregates callables for a single report parser and potentially multiple report writers. As a whole, it contains all arguments needed to generate a certain kind of report.
It is used in
utopya.reporter.Reporter
and derived classes, which are the classes that actually implement the parsers and writers.- __init__(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]#
Initializes a ReportFormat object, which gathers callables needed to create a report in a certain format.
- Parameters
parser (Callable) – The parser method to use
writers (List[Callable]) – The writer method(s) to use
min_report_intv (float, optional) – The minimum report interval of reports in this format. Determines the time (in seconds) that needs to have passed before the next report will be emitted.
- property min_report_intv: Optional[datetime.timedelta]#
Returns the minimum report interval, i.e. the time that needs to have passed between two reports.
- property reporting_blocked: bool#
Determines whether this ReportFormat may be blocked from emission, e.g. because of the minimum report interval not having passed yet.
If no minimum report interval is given, will always return False. Otherwise checks if at least that interval has passed since the last report.
- class utopya.reporter.Reporter(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]#
Bases:
object
The Reporter class holds general reporting capabilities.
It needs to be subclassed in order to specialize its reporting functions.
- __init__(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]#
Initialize the Reporter base class.
- Parameters
report_formats (Union[List[str], Dict[str, dict]], optional) – The report formats to use with this reporter. If given as list of strings, the strings are the names of the report formats as well as those of the parsers; all other parameters are the defaults. If given as dict of dicts, the keys are the names of the formats and the inner dicts are the parameters to create report formats from.
default_format (str, optional) – The name of the default report format; if None is given, the .report method requires the name of a report format.
report_dir (str, optional) – if reporting to a file; this is the base directory that is reported to.
suppress_cr (bool, optional) – Whether to suppress carriage return characters in writers. This option is useful when the reporter is not the only class that writes to a stream.
- property default_format: Union[None, utopya.reporter.ReportFormat]#
Returns the default report format or None, if not set.
- property suppress_cr: bool#
Whether to suppress a carriage return. Objects using the reporter can set this property to communicate that they will be putting content into the stdout stream as well. The writers can check this property and adjust their behaviour accordingly.
- add_report_format(name: str, *, parser: Optional[str] = None, write_to: Union[str, Dict[str, dict]] = 'stdout', min_report_intv: Optional[float] = None, rf_kwargs: Optional[dict] = None, **parser_kwargs)[source]#
Add a report format to this reporter.
- Parameters
name (str) – The name of this format
parser (str, optional) – The name of the parser; if not given, the name of the report format is assumed
write_to (Union[str, Dict[str, dict]], optional) – The name of the writer. If this is a dict of dict, the keys will be interpreted as the names of the writers and the nested dict as the
**kwargs
to the writer function.min_report_intv (float, optional) – The minimum report interval (in seconds) for this report format
rf_kwargs (dict, optional) – Further kwargs to ReportFormat.__init__
**parser_kwargs – The kwargs to the parser function
- Raises
ValueError – A report format with this name already exists
- report(report_format: Optional[str] = None, **kwargs) bool [source]#
Create a report with the given format; if none is given, the default format is used.
- Parameters
report_format (str, optional) – The report format to use
**kwargs – Passed on to the ReportFormat.report() call
- Returns
Whether there was a report
- Return type
- Raises
ValueError – If no default format was set and no report format name was given
- parse_and_write(*, parser: Union[str, Callable], write_to: Union[str, Callable], **parser_kwargs)[source]#
This function allows to select a parser and writer explicitly.
- Parameters
- _resolve_parser(parser: Union[str, Callable], **parser_kwargs) Callable [source]#
Given a string or a callable, returns the corresponding callable.
- Parameters
parser (Union[str, Callable]) – If a callable is already given, returns that; otherwise looks for a parser method with the given name in the attributes of this class.
**parser_kwargs – Arguments that should be passed to the parser. If given, a new function is created where these arguments are already included.
- Returns
The desired parser function
- Return type
Callable
- Raises
ValueError – If no parser with the given name is available
- _resolve_writers(write_to) Dict[str, Callable] [source]#
Resolves the given argument to a list of callable writer functions.
- Parameters
write_to –
a specification of the writers to use. Allows many different ways of specifying the writer functions, depending on the type of the argument:
str: the name of the writer method of this reporter
Callable: the writer function to use
sequence of str and/or Callable: the names and/or functions to use
Dict[str, dict]: the names of the writer functions and additional keyword arguments.
If the type is wrong, will raise.
- Returns
the writers (key: name, value: writer method)
- Return type
Dict[str, Callable]
- Raises
TypeError – Invalid
write_to
argumentValueError – A writer with that name was already added or a writer with the given name is not available.
- _write_to_stdout(s: str, *, flush: bool = True, **print_kws)[source]#
Writes the given string to stdout using the print function.
- _write_to_stdout_noreturn(s: str, *, prepend=' ')[source]#
Writes to stdout without ending the line. Always flushes.
- _write_to_log(s: str, *, lvl: int = 10, skip_if_empty: bool = False)[source]#
Writes the given string via the logging module.
- _write_to_file(s: str, *, path: str = '_report.txt', mode: str = 'w', skip_if_empty: bool = False)[source]#
Writes the given string to a file
- Parameters
s (str) – The string to write
path (str, optional) – The path to write it to; will be assumed relative to the
report_dir
attribute; if that is not given,path
needs to be absolute. By default, assumes that there is areport_dir
given.mode (str, optional) – Writing mode of that file
skip_if_empty (bool, optional) – Whether to skip writing if
s
is empty.
- Raises
ValueError – If
report_dir
was not set andpath
is relative.
- class utopya.reporter.WorkerManagerReporter(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#
Bases:
utopya.reporter.Reporter
This class specializes the base
Reporter
to report on theWorkerManager
state and its progress.- TTY_MARGIN = 4#
Margin to use when writing to terminal
- PROGRESS_BAR_SYMBOLS = {'active': '░', 'active_progress': '▒', 'finished': '▓', 'space': ' '}#
Symbols to use in progress bar parser
- __init__(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#
Initialize the specialized reporter for the
WorkerManager
.It is aware of the WorkerManager and may additionally have acces to the
Multiverse
it is embedded in, which provides additional information to report parsers.- Parameters
wm (utopya.workermanager.WorkerManager) – The associated WorkerManager instance
mv (utopya.multiverse.Multiverse, optional) – The Multiverse this reporter is used in. If this is provided, it can be used in report parsers, e.g. to provide additional information on simulations.
**reporter_kwargs – Passed on to parent method
- property wm: utopya.workermanager.WorkerManager#
Returns the associated
WorkerManager
- property task_counters: collections.OrderedDict#
Returns a dict of task counters containing the following entries:
total
: total number of registered WorkerManager tasksactive
: number of currently active tasksfinished
: number of finished tasks, including tasks that were stopped via a stop conditionstopped
: number of tasks for which stop conditions were fulfilled, see Stop Conditions
- property wm_active_tasks_progress: float#
The active tasks’ progress. If there are no active tasks in the worker manager, returns 0.
- property wm_elapsed: Optional[datetime.timedelta]#
Seconds elapsed since start of working or None if not yet started
- property wm_times: dict#
Return the characteristics of WorkerManager times. Calls
get_progress_info()
without any additional arguments.
- register_task(task: utopya.task.WorkerTask)[source]#
Given the task object, extracts and stores some information like its run time or its exit code. Exit codes are aggregated over multiple registrations.
This can be used as a callback function from a WorkerTask object.
- Parameters
task (utopya.task.WorkerTask) – The WorkerTask to extract information from.
- calc_runtime_statistics(min_num: int = 10) Optional[collections.OrderedDict] [source]#
Calculates the current runtime statistics.
- Parameters
min_num (int, optional) – Minimum number of runtimes that need to be registered for these statistics to actually be computed. If below this number, will return None.
- Returns
- The runtime statistics or None, if there
were too few entries.
- Return type
Union[OrderedDict, None]
- get_progress_info(**eta_options) Dict[str, float] [source]#
Compiles a dict containing progress information for the current work session.
- Parameters
**eta_options – Passed on to method calculating
est_left
,_compute_est_left()
.- Returns
- Progress information. Guaranteed to contain the
keys
start
,now
,elapsed
,est_left
,est_end
, andend
.
- Return type
- _compute_est_left(*, progress: float, elapsed: datetime.timedelta, mode: str = 'from_start', progress_buffer_size: int = 60) datetime.timedelta [source]#
Computes the estimated time left until the end of the work session (ETA) using the current progress value and the elapsed time. Depending on
mode
, additional information may be included in the calculation.- Parameters
progress (float) – The current progress value, in (0, 1]
elapsed (datetime.timedelta) – The elapsed time since start
mode (str, optional) –
By which mode to calculate the ETA. Available modes are:
from_start
, where ETA is computed from the start ofwork session.
from_buffer
, where ETA is computed from a morerecent point during the work session. This uses a buffer to keep track of recent progress and computes the ETA against the oldest record (controlled by argument
progress_buffer_size
), giving more accurate estimates for long-running work sessions.
progress_buffer_size (int, optional) – The size of the ring buffer used in
from_buffer
mode.
- Returns
- Estimate for how much time is left until the
end of the work session.
- Return type
- _parse_task_counters(*, report_no: Optional[int] = None) str [source]#
Return a string that shows the task counters of the WorkerManager
- Parameters
report_no (int, optional) – A counter variable passed by the
ReportFormat
call, indicating how often this parser was called so far.- Returns
A str representation of the task counters of the WorkerManager
- Return type
- _parse_progress(*, report_no: Optional[int] = None) str [source]#
Returns a progress string
- Parameters
report_no (int, optional) – A counter variable passed by the
ReportFormat
call, indicating how often this parser was called so far.- Returns
A simple progress indicator
- Return type
- _parse_progress_bar(*, num_cols: Union[str, int] = 'fixed', fstr: str = ' ╠{ticks[0]:}{ticks[1]:}{ticks[2]:}{ticks[3]:}╣ {info:}{times:}', info_fstr: str = '{total_progress:>5.1f}% ', show_times: bool = False, times_fstr: str = '| {elapsed:} elapsed | ~{est_left:} left ', times_fstr_final: str = '| finished in {elapsed:} ', times_kwargs: dict = {}, report_no: Optional[int] = None) str [source]#
Returns a progress bar.
It shows the amount of finished tasks, active tasks, and a percentage.
- Parameters
num_cols (Union[str, int], optional) – The number of columns available for creating the progress bar. Can also be a string
adaptive
to poll terminal size upon each call, orfixed
to use the number of columns determined at import time.fstr (str, optional) – The format string for the final output. Should contain the
ticks
4-tuple, which makes up the progress bar, and can optionally contain the``info`` andtimes
segments, formatted using the respective format string arguments.info_fstr (str, optional) –
The format string for the
info
section of the final output. Available keys:total_progress
active_progress
cnt
, the task counters dictionary, see:
show_times (bool, optional) – Whether to show a short version of the results of the times parser
times_fstr (str, optional) – Format string for times information
times_fstr_final (str, optional) – Format string for times information once the work session has ended
times_kwargs (dict, optional) – Passed on to
times
parser. Only used ifshow_times
is set.report_no (int, optional) – A counter variable passed by the
ReportFormat
call, indicating how often this parser was called so far.
- Returns
The one-line progress bar
- Return type
- _parse_times(*, fstr: str = 'Elapsed: {elapsed:<8s} | Est. left: {est_left:<8s} | Est. end: {est_end:<10s}', timefstr_short: str = '%H:%M:%S', timefstr_full: str = '%d.%m., %H:%M:%S', use_relative: bool = True, times: Optional[dict] = None, report_no: Optional[int] = None, **progress_info_kwargs) str [source]#
Parses the WorkerManager’s time information, including estimated time left or others.
- Parameters
fstr (str, optional) – The main format string; gets as keys the results of the WorkerManager time information. Available keys:
elapsed
,est_left
,est_end
,start
,now
,end
.timefstr_short (str, optional) – A time format string for absolute dates; short version.
timefstr_full (str, optional) – A time format string for absolute dates; long (ideally: full) version.
use_relative (bool, optional) – Whether for a date difference of 1 to use relative dates, e.g.
Today, 13:37
.times (dict, optional) – A dict of times to use; this is mainly for testing purposes!
report_no (int, optional) – The report number passed by ReportFormat
**progress_info_kwargs – Passed on to method calculating progress
get_progress_info()
- Returns
A string representation of the time information
- Return type
- _parse_runtime_stats(*, fstr: str = ' {k:<13s} {v:}', join_char='\n', ms_precision: int = 1, report_no: Optional[int] = None) str [source]#
Parses the runtime statistics dict into a multiline string
- Parameters
fstr (str, optional) – The format string to use. Gets passed the keys
k
andv
wherek
is the name of the entry andv
its value. Note thatv
is a non-numeric value.join_char (str, optional) – The join character / string to join the elements together.
ms_precision (int, optional) – Number of digits to represent the milliseconds part of the runtimes.
report_no (int, optional) – A counter variable passed by the
ReportFormat
call, indicating how often this parser was called so far.
- Returns
The multi-line runtime statistics
- Return type
- _parse_report(*, fstr: str = ' {k:<{w:}s} {v:}', min_num: int = 4, report_no: Optional[int] = None, show_individual_runtimes: bool = True, task_label_singular: str = 'task', task_label_plural: str = 'tasks') str [source]#
Parses a report for all tasks that were being worked on into a multiline string. The headings can be adjusted by keyword arguments.
- Parameters
fstr (str, optional) – The format string to use. Gets passed the keys
k
andv
wherek
is the name of the entry andv
its value. Note that this format string is also used withv
being a non-numeric value. Also,w
can be used to have a key column of constant width.min_num (int, optional) – The minimum number of universes needed to calculate runtime statistics.
report_no (int, optional) – A counter variable passed by the
ReportFormat
call, indicating how often this parser was called so far.show_individual_runtimes (bool, optional) – Whether to report individual universe runtimes; default: True. This should be disabled if there are a huge number of universes.
task_label_singular (str, optional) – The label to use in the report when referring to a single task.
task_label_plural (str, optional) – The label to use in the report when referring to multiple tasks.
- Returns
The multi-line simulation report string
- Return type
- _parse_pspace_info(*, fstr: str, report_no: Optional[int] = None) str [source]#
Provides information about the parameter space.
Extracts the
parameter_space
from the associated Multiverse’s meta configuration and provides information on that.If there are multiple tasks specified, it is assumed that a sweep is or was being carried out and an information string is retrieved from the
paramspace.paramspace.ParamSpace
object, which is then returned. If only a single task was defined, returns an empty string.- Parameters
report_no (int, optional) – A counter variable passed by the
ReportFormat
call, indicating how often this parser was called so far.- Returns
- If there is more than one task, returns the result of
paramspace.paramspace.ParamSpace.get_info_str()
. If not, returns a string denoting that there was only one task.
- Return type
- _write_to_file(*args, path: str = '_report.txt', cluster_mode_path: str = '{0:}_{node_name:}{ext:}', **kwargs)[source]#
Overloads the parent method with capabilities needed in cluster mode
All args and kwargs are passed through. If in cluster mode, the path is changed such that it includes the name of the node.
- Parameters
*args – Passed on to parent method
path (str, optional) – The path to save to
cluster_mode_path (str, optional) – The format string to use for the path in cluster mode. _Requires_ to contain the format key
{0:}
which retains the givenpath
, extension split off. Extension can be used viaext
(already includes the dot). Additional format keys:node_name
,job_id
.**kwargs – Passed on to parent method
utopya.stop_conditions module#
This module implements the StopCondition
class, which is used by the WorkerManager
to
stop a worker process in certain situations.
In addition, it implements a set of basic stop condition functions and
provides the stop_condition_function()
decorator which is required to make them accessible by name.
- utopya.stop_conditions.SIG_STOPCOND = 'SIGUSR1'#
Signal to use for stopping workers with fulfilled stop conditions
- utopya.stop_conditions.STOP_CONDITION_FUNCS: Dict[str, Callable] = {'check_monitor_entry': <function check_monitor_entry>, 'timeout_wall': <function timeout_wall>}#
Registered stop condition functions are stored in this dictionary. These functions evaluate whether a certain stop condition is actually fulfilled.
To that end, a
WorkerTask
object is passed to these functions, the information in which can be used to determine whether the condition is fulfilled. The signature of these functions is:(task: WorkerTask, **kws) -> bool
- utopya.stop_conditions._FAILED_MONITOR_ENTRY_CHECKS = []#
Keeps track of failed monitor entry checks in the
check_monitor_entry()
stop condition function in order to avoid repetitive warnings.
- class utopya.stop_conditions.StopCondition(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]#
Bases:
object
A StopCondition object holds information on the conditions in which a worker process should be stopped.
- __init__(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]#
Create a new stop condition object.
- Parameters
to_check (List[dict], optional) – A list of dicts, that holds the functions to call and the arguments to call them with. The only requirement for the dict is that the
func
key is available. All other keys are unpacked and passed as kwargs to the given function. Thefunc
key can be either a callable or a string corresponding to a name in the utopya.stopcond_funcs module.name (str, optional) – The name of this stop condition
description (str, optional) – A short description of this stop condition
enabled (bool, optional) – Whether this stop condition should be checked; if False, it will be created but will always be un- fulfilled when checked.
func (Union[Callable, str], optional) – (For the short syntax only!) If no
to_check
argument is given, a function can be given here that will be the only one that is checked. If this argument is a string, it is also resolved from the utopya stopcond_funcs module.**func_kwargs – (For the short syntax) The kwargs that are passed to the single stop condition function
- property fulfilled_for: Set[utopya.task.Task]#
The set of tasks this stop condition was fulfilled for
- static _resolve_sc_funcs(to_check: List[dict], func: Union[str, Callable], func_kwargs: dict) List[tuple] [source]#
Resolves the functions and kwargs that are to be checked.
The callable is either retrieved from the module-level stop condition functions registry or, if the given
func
is already a callable, that one will be used.
- __str__() str [source]#
A string representation for this StopCondition, including the name and, if given, the description.
- fulfilled(task: utopya.task.Task) bool [source]#
Checks if the stop condition is fulfilled for the given worker, using the information from the dict.
All given stop condition functions are evaluated; if all of them return True, this method will also return True.
Furthermore, if the stop condition is fulfilled, the task’s set of fulfilled stop conditions will
- Parameters
task (utopya.task.Task) – Task object that is to be checked
- Returns
- If all stop condition functions returned true for the given
worker and its current information
- Return type
- yaml_tag = '!stop-condition'#
- classmethod to_yaml(representer, node)[source]#
Creates a yaml representation of the StopCondition object by storing the initialization kwargs as a yaml mapping.
- Parameters
representer (ruamel.yaml.representer) – The representer module
node (StopCondition) – The node, i.e. an instance of this class
- Returns
a yaml mapping that is able to recreate this object
- utopya.stop_conditions.stop_condition_function(f: Callable)[source]#
A decorator that registers the decorated callable in the module-level stop condition function registry. The callable’s
__name__
attribute will be used as the key.- Parameters
f (Callable) – A callable that is to be added to the function registry.
- Raises
AttributeError – If the name already exists in the registry
- utopya.stop_conditions.timeout_wall(task: utopya.task.WorkerTask, *, seconds: float) bool [source]#
Checks the wall timeout of the given worker
- Parameters
task (utopya.task.WorkerTask) – The WorkerTask object to check
seconds (float) – After how many seconds to trigger the wall timeout
- Returns
Whether the timeout is fulfilled
- Return type
- utopya.stop_conditions.check_monitor_entry(task: utopya.task.WorkerTask, *, entry_name: str, operator: str, value: float) bool [source]#
Checks if a monitor entry compares in a certain way to a given value
- Parameters
task (utopya.task.WorkerTask) – The WorkerTask object to check
entry_name (str) – The name of the monitor entry, leading to the value to the left-hand side of the operator
operator (str) – The binary operator to use
value (float) – The right-hand side value to compare to
- Returns
Result of op(entry, value)
- Return type
utopya.task module#
The Task class supplies a container for all information needed for a task.
The WorkerTask and ProcessTask classes specialize on tasks for the WorkerManager that work on subprocesses or multiprocessing processes.
- utopya.task._ANSI_ESCAPE = re.compile('\\x1B(?:[@-Z\\\\-_]|\\[[0-?]*[ -/]*[@-~])')#
A regex pattern to remove ANSI escape characters, needed for stream saving
- utopya.task._follow(f: _io.TextIOWrapper, delay: float = 0.05, should_stop: typing.Callable = <function <lambda>>) Generator[str, None, None] [source]#
Generator that follows the output written to the given stream object and yields each new line written to it. If no output is retrieved, there will be a delay to reduce processor load.
The
should_stop
argument may be a callable that will lead to breaking out of the waiting loop. If it is not given, the loop will only break if reading from the streamf
is no longer possible, e.g. because it was closed.
- utopya.task.enqueue_lines(*, queue: queue.Queue, stream: TextIO, follow: bool = False, parse_func: Optional[Callable] = None) None [source]#
From the given text stream, read line-buffered lines and add them to the provided queue as 2-tuples, (line, parsed object).
This function is meant to be passed to an individual thread in which it can read individual lines separately from the main thread. Before exiting this function, the stream is closed.
- Parameters
queue (queue.Queue) – The queue object to put the read line and parsed objects into.
stream (TextIO) – The stream identifier. If this is not a text stream, be aware that the elements added to the queue might need decoding.
follow (bool, optional) – If instead of
iter(stream.readline)
, the_follow()
function should be used instead. This should be selected if the stream is file-like instead ofsys.stdout
-like.parse_func (Callable, optional) – A parse function that the read line is passed through. This should be a unary function that either returns a successfully parsed line or None.
- utopya.task.parse_yaml_dict(line: str, *, start_str: str = '!!map') Union[None, dict] [source]#
A yaml parse function that can be passed to enqueue_lines. It only tries parsing the line if it starts with the provided start string.
It tries to decode the line, and parse it as a yaml. If that fails, it will still try to decode the string. If that fails yet again, the unchanged line will be returned.
- class utopya.task.Task(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]#
Bases:
object
The Task is a container for a task handled by the WorkerManager.
It aims to provide the necessary interfaces for the WorkerManager to easily associate tasks with the corresponding workers and vice versa.
- __init__(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]#
Initialize a Task object.
- Parameters
name (str, optional) – The task’s name. If none is given, the generated uuid will be used.
priority (float, optional) – The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority.
callbacks (Dict[str, Callable], optional) – A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object.
progress_func (Callable, optional) – Invoked by the
progress
property and used to calculate the progress given the current task object as argument
- _name#
- _priority#
- _uid#
- callbacks#
- _progress_func#
- _stop_conditions#
- property progress: float#
If a progress function is given, invokes it; otherwise returns 0
This also performs checks that the progress is in [0, 1]
- property fulfilled_stop_conditions: Set[StopCondition]#
The set of fulfilled stop conditions for this task. Typically, this is set by the
StopCondition
itself as part of its evaluation in itsfulfilled()
method.
- class utopya.task.WorkerTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#
Bases:
utopya.task.Task
A specialisation of
Task
for use in theWorkerManager
.It is able to spawn a worker process using
subprocess.Popen
, executing the task in a non-blocking manner. At the same time, the worker’s stream can be read in via another non-blocking thread and stream information can be parsed. Furthermore, this class provides most of the interface for signalling the spawned process.For an equivalent class that uses
multiprocessing
instead ofsubprocess
, see the derivedMPProcessTask
.- STREAM_PARSE_FUNCS = {'default': None, 'yaml_dict': <function parse_yaml_dict>}#
- __init__(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#
Initialize a WorkerTask.
This is a specialization of
Task
for use in theWorkerManager
.- Parameters
setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the
worker_kwargs
keyword argument, containing the dict passed here. Additionally,setup_kwargs
are unpacked into the funtion call. The function should return a dict that is then used asworker_kwargs
for the individual task.setup_kwargs (dict, optional) – The keyword arguments unpacked into the
setup_func
call.worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to
setup_func
and, if asetup_func
is given, the return value of that function will be used for theworker_kwargs
.**task_kwargs – Arguments to be passed to
__init__()
, including the callbacks dictionary among other things.
- Raises
ValueError – If neither
setup_func
norworker_kwargs
were given, thus lacking information on how to spawn the worker.
- setup_func#
- setup_kwargs#
- worker_kwargs#
- _worker#
- _worker_pid#
- _worker_status#
- streams#
- profiling#
- property worker: subprocess.Popen#
The associated worker process object or None, if not yet created.
- property worker_status: Optional[int]#
The worker processe’s current status or False, if there is no worker spawned yet.
Note that this invokes a poll to the worker process if one was spawned.
- Returns
- Current worker status. False, if there was no
worker associated yet.
- Return type
Union[int, None]
- spawn_worker() subprocess.Popen [source]#
Spawn a worker process using subprocess.Popen and manage the corresponding queue and thread for reading the stdout stream.
If there is a
setup_func
, this function will be called first.Afterwards, from the worker_kwargs returned by that function or from the ones given during initialisation (if no
setup_func
was given), the worker process is spawned and associated with this task.- Returns
The created process object
- Return type
- Raises
RuntimeError – If a worker was already spawned for this task.
TypeError – For invalid
args
argument
- read_streams(stream_names: list = 'all', *, max_num_reads: int = 10, forward_directly: bool = False) None [source]#
Read the streams associated with this task’s worker.
- Parameters
stream_names (list, optional) – The list of stream names to read. If
all
(default), will read all streams.max_num_reads (int, optional) –
How many lines should be read from the buffer. For -1, reads the whole buffer.
Warning
Do not make this value too large as it could block the whole reader thread of this worker.
forward_directly (bool, optional) – Whether to call the
forward_streams()
method; this is done before the callback and can be useful if the callback should not happen before the streams are forwarded.
- Returns
None
- save_streams(stream_names: list = 'all', *, final: bool = False)[source]#
For each stream, checks if it is to be saved, and if yes: saves it.
The saving location is stored in the streams dict. The relevant keys are the save flag and the save_path string.
Note that this function does not save the whole stream log, but only those part of the stream log that have not already been saved. The position up to which the stream was saved is stored under the lines_saved key in the stream dict.
- Parameters
stream_names (list, optional) – The list of stream names to _check_. If ‘all’ (default), will check all streams whether the save flag is set.
save_raw (bool, optional) – If True, stores the raw log; otherwise stores the regular log, i.e. the lines that were parseable not included.
final (bool, optional) – If True, this is regarded as the final save operation for the stream, which will lead to additional information being saved to the end of the log.
remove_ansi (bool, optional) – If True, will remove ANSI escape characters (e.g. from colored logging) from the log before saving to file.
- Returns
None
- forward_streams(stream_names: list = 'all', forward_raw: bool = False) bool [source]#
Forwards the streams to stdout, either via logging module or print
This function can be periodically called to forward the part of the stream logs that was not already forwarded to stdout.
The information for that is stored in the stream dict. The log_level entry is used to determine whether the logging module should be used or (in case of None) the print method.
- signal_worker(signal: str) tuple [source]#
Sends a signal to this WorkerTask’s worker.
- Parameters
signal (str) – The signal to send. Needs to be a valid signal name, i.e.: available in python signal module.
- Raises
ValueError – When an invalid signal argument was given
- Returns
(signal: str, signum: int) sent to the worker
- Return type
- _prepare_process_args(*, args: tuple, read_stdout: bool, **kwargs) Tuple[tuple, dict] [source]#
Prepares the arguments that will be passed to subprocess.Popen
- _spawn_process(args, **popen_kwargs)[source]#
This helper takes care only of spawning the actual process and potential error handling.
It can be subclassed to spawn a different kind of process
- _spawn_worker(*, args: tuple, popen_kwargs: Optional[dict] = None, read_stdout: bool = True, **_) subprocess.Popen [source]#
Helper function to spawn the worker subprocess
- _setup_stream_reader(stream_name: str, *, stream, parser: str = 'default', follow: bool = False, save_streams: bool = False, save_streams_to: Optional[str] = None, save_raw: bool = True, remove_ansi: bool = False, forward_streams: bool = False, forward_raw: bool = True, streams_log_lvl: Optional[int] = None, **_)[source]#
Sets up the stream reader thread
- utopya.task._target_wrapper(target, streams: dict, *args, **kwargs)[source]#
A wrapper around the multiprocessing.Process target function which takes care of stream handling.
- class utopya.task.PopenMPProcess(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]#
Bases:
object
A wrapper around multiprocessing.Process that replicates (wide parts of) the interface of subprocess.Popen.
- __init__(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]#
Creates a
multiprocessing.Process
and starts it.The interface here is a subset of
subprocess.Popen
that makes those features available that make sense for amultiprocessing.Process
, mainly: stream reading.Subsequently, the interface is quite a bit different to that of the
multiprocessing.Process
. The most important arguments of that interface aretarget
,args
, andkwargs
, which can be set as follows:target
will beargs[0]
args
will beargs[1:]
kwargs
is an additional keyword argument that is not part of thesubprocess.Popen
interface typically.
Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a
subprocess.PIPE
or another stream specifier that is notsubprocess.DEVNULL
, a newmultiprocessing.Pipe
and a reader thread will be established.Warning
This will always use
spawn
as a start method for the process!- Parameters
args (tuple) – The
target
callable (args[0]
) and subsequent positional arguments.kwargs (dict, optional) – Keyword arguments for the
target
.stdin (None, optional) – The stdin stream
stdout (None, optional) – The stdout stream
stderr (None, optional) – The stderr stream
bufsize (int, optional) – The buffersize to use.
encoding (str, optional) – The encoding to use for the streams; should typically remain
utf8
, using other values is not encouraged!
- _prepare_target_args(args: tuple, *, stdin, stdout, stderr) Tuple[Callable, tuple] [source]#
Prepares the target callable and stream objects
- poll() Optional[int] [source]#
Check if child process has terminated. Set and return
returncode
attribute. Otherwise, returns None.With the underlying process being a multiprocessing.Process, this method is equivalent to the
returncode
property.
- wait(timeout=None)[source]#
Wait for the process to finish; blocking call.
This method is not yet implemented, but will be!
- communicate(input=None, timeout=None)[source]#
Communicate with the process.
This method is not yet implemented! Not sure if it will be …
- property args: tuple#
The
args
argument to this process. Note that the returned tuple includes the target callable as its first entry.Note that these have already been passed to the process; changing them has no effect.
- property kwargs#
Keyword arguments passed to the target callable.
Note that these have already been passed to the process; changing them has no effect.
- property stdin#
The attached
stdin
stream
- property stdout#
The attached
stdout
stream
- property stderr#
The attached
stderr
stream
- property pid#
Process ID of the child process
- class utopya.task.MPProcessTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#
Bases:
utopya.task.WorkerTask
A WorkerTask specialization that uses multiprocessing.Process instead of subprocess.Popen.
It is mostly equivalent to
WorkerTask
but adjusts the private methods that take care of spawning the actual process and setting up the stream readers, such that the particularities of thePopenMPProcess
wrapper are accounted for.- _spawn_process(args, **popen_kwargs) utopya.task.PopenMPProcess [source]#
This helper takes care only of spawning the actual process and potential error handling. It returns an PopenMPProcess instance, which has the same interface as subprocess.Popen.
- _setup_stream_reader(*args, **kwargs)[source]#
Sets up the stream reader with
follow=True
, such that the file- like streams that PopenMPProcess uses can be read properly.
- _stop_stream_reader(name: str)[source]#
Stops the stream reader thread with the given name by telling its follow function to stop, thus ending iteration.
- setup_func#
- setup_kwargs#
- worker_kwargs#
- _worker#
- _worker_pid#
- _worker_status#
- streams#
- profiling#
- class utopya.task.TaskList[source]#
Bases:
object
The TaskList stores Task objects in it, ensuring that none is in there twice and allows to lock it to prevent adding new tasks.
- __contains__(val: utopya.task.Task) bool [source]#
Checks if the given object is contained in this TaskList.
- __getitem__(idx: int) utopya.task.Task [source]#
Returns the item at the given index in the TaskList.
- lock()[source]#
If called, the TaskList becomes locked and allows no further calls to the append method.
- append(val: utopya.task.Task)[source]#
Append a Task object to this TaskList
- Parameters
val (Task) – The task to add
- Raises
RuntimeError – If TaskList object was locked
TypeError – Tried to add a non-Task type object
ValueError – Task already added to this TaskList
- __add__(tasks: Sequence[utopya.task.Task])[source]#
Appends all the tasks in the given iterable to the task list
utopya.testtools module#
Tools that help testing models.
This mainly supplies the ModelTest class, which is a specialization of the
Model
for usage in tests.
- class utopya.testtools.ModelTest(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]#
Bases:
utopya.model.Model
A class to use for testing Utopia models.
It attaches to a certain model and makes it easy to load config files with which test should be carried out.
- __init__(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]#
Initialize the ModelTest class for the given model name.
This is basically like the base class __init__ just that it sets the default value of
use_tmpdir
to True and renames TODO- Parameters
model_name (str) – Name of the model to test
test_file (str, optional) – The file this ModelTest is used in. If given, will look for config files relative to the folder this file is located in.
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.
- Raises
ValueError – If the directory extracted from test_file is invalid
utopya.tools module#
Implements generally useful functions, partly by importing from
dantro.tools
- utopya.tools.load_selected_keys(src: dict, *, add_to: dict, keys: Sequence[Tuple[str, type, bool]], err_msg_prefix: Optional[str] = None, prohibit_unexpected: bool = True) None [source]#
Loads (only) selected keys from dict
src
into dictadd_to
.- Parameters
src (dict) – The dict to load values from
add_to (dict) – The dict to load values into
keys (Sequence[Tuple[str, type, bool]]) – Which keys to load, given as sequence of
(key, allowed types, [required=False])
tuples.err_msg_prefix (str) – A description string, used in error message
prohibit_unexpected (bool, optional) – Whether to raise on keys that were unexpected, i.e. not given in
keys
argument.
- Raises
KeyError – On missing key in
src
TypeError – On bad type of value in
src
ValueError – On unexpected keys in
src
- utopya.tools.add_item(value, *, add_to: dict, key_path: Sequence[str], value_func: Optional[Callable] = None, is_valid: Optional[Callable] = None, ErrorMsg: Optional[Callable] = None) None [source]#
Adds the given value to the
add_to
dict, traversing the given key path. This operation happens in-place.- Parameters
value – The value of what is to be stored. If this is a callable, the result of the call is stored.
add_to (dict) – The dict to add the entry to
key_path (Sequence[str]) – The path at which to add it
value_func (Callable, optional) – If given, calls it with
value
as argument and uses the return value to add to the dictis_valid (Callable, optional) – Used to determine whether
value
is valid or not; should take single positional argument, return boolErrorMsg (Callable, optional) – A raisable object that prints an error message; gets passed
value
as positional argument.
- Raises
Exception – type depends on specified
ErrorMsg
callable
- utopya.tools.pprint(obj: Any, **kwargs)[source]#
Prints a “pretty” string representation of the given object.
- Parameters
obj (Any) – The object to print
**kwargs – Passed to print
- utopya.tools.pformat(obj: Any) str [source]#
Creates a “pretty” string representation of the given object.
This is achieved by creating a yaml representation.
Todo
Improve parsing of leaf-level mappings
- utopya.tools.ensure_not_None(d: typing.Optional[typing.Any], fallback: typing.Union[type, typing.Callable] = <class 'dict'>) Any [source]#
Returns
d
if it is not None, otherwise creates a new object by callingfallback
without any arguments.
- utopya.tools.parse_si_multiplier(s: str) int [source]#
Parses a string like
1.23M
or-2.34 k
into an integer.If it is a string, parses the SI multiplier and returns the appropriate integer for use as number of simulation steps. Supported multipliers are
k
,M
,G
andT
. These need to be used as the suffix of the string.Note
This is only intended to be used with integer values and does not support float values like
1u
.The used regex can be found here.
- Parameters
s (str) – A string representing an integer number, potentially including a supported SI multiplier as suffix.
- Returns
- The parsed number of steps as integer. If the value has decimal
places, integer rounding is applied.
- Return type
- Raises
ValueError – Upon string that does not match the expected pattern
- utopya.tools.parse_num_steps(N: Union[str, int], *, raise_if_negative: bool = True) int [source]#
Given a string like
1.23M
or an integer, prepares the num_steps argument for a single universe simulation.For string arguments, uses
parse_si_multiplier()
for string parsing. If that fails, attempts to read it in float notation by callingint(float(N))
.Note
This function always applies integer rounding.
- Parameters
- Returns
The parsed value for
num_steps
- Return type
- Raises
ValueError – Result invalid, i.e. not parseable or of negative value.
utopya.workermanager module#
The WorkerManager
is a central part of utopya in that it
spawns and controls the tasks (WorkerTask
) that are
to be worked on.
- utopya.workermanager.STOPCOND_EXIT_CODES: Sequence[int] = (-10, 10, 138)#
Exit codes of a
WorkerTask
that will be interpreted as stemming from a stop condition. This depends on the signal used for stop conditions (utopya.stop_conditions.SIG_STOPCOND
). This sequence of possible exit codes takes into account that the sign may be switched (depending on whether a signed or unsigned integer convention is used) or where a convention is used such that a handled signal is turned into an exit code of128 + abs(signum)
.
- class utopya.workermanager.WorkerManager(num_workers: typing.Union[int, str] = 'auto', poll_delay: float = 0.05, spawn_rate: int = -1, lines_per_poll: int = 50, periodic_task_callback: typing.Optional[int] = None, QueueCls: type = <class 'queue.Queue'>, reporter: typing.Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: typing.Optional[typing.Dict[str, typing.Union[str, typing.List[str]]]] = None, save_streams_on: typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: typing.Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: typing.Optional[dict] = None)[source]#
Bases:
object
The WorkerManager class orchestrates
WorkerTask
objects: setting them up, invoking them, tracking their progress, and starting new workers if previous workers finished.- Attrs:
- rf_spec (dict): The report format specifications that are used
throughout the WorkerManager. These are invoked at different points of the operation of the WorkerManager:
while_working
,after_work
,after_abort
,task_spawn
,task_finished
.
- __init__(num_workers: typing.Union[int, str] = 'auto', poll_delay: float = 0.05, spawn_rate: int = -1, lines_per_poll: int = 50, periodic_task_callback: typing.Optional[int] = None, QueueCls: type = <class 'queue.Queue'>, reporter: typing.Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: typing.Optional[typing.Dict[str, typing.Union[str, typing.List[str]]]] = None, save_streams_on: typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: typing.Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: typing.Optional[dict] = None)[source]#
Initialize the worker manager.
- Parameters
num_workers (Union[int, str], optional) – The number of workers that can work in parallel. If ‘auto’ (default), uses os.cpu_count(). If below zero, deduces abs(num_workers) from the CPU count.
poll_delay (float, optional) – How long (in seconds) the delay between worker polls should be. For too small delays (<0.01), the CPU load will become significant.
spawn_rate (int, optional) – How many workers to spawn each working loop iteration. If -1, will assign new tasks to all free workers.
lines_per_poll (int, optional) – How many lines to read from each stream during polling of the tasks. This value should not be too large, otherwise the polling is delayed by too much. By setting it to -1, all available lines are read.
periodic_task_callback (int, optional) – If given, an additional task callback will be invoked after every
periodic_task_callback
poll events.QueueCls (type, optional) – Which class to use for the queue. Defaults to the FiFo
queue.Queue
.reporter (WorkerManagerReporter, optional) – The reporter associated with this WorkerManager, reporting on the progress.
rf_spec (Dict[str, Union[str, List[str]]], optional) –
The names of report formats that should be invoked at different points of the WorkerManager’s operation. Possible keys:
before_working
,while_working
,after_work
,after_abort
,task_spawn
,task_finished
. All other keys are ignored.The values of the dict can be either strings or lists of strings, where the strings always refer to report formats registered with the WorkerManagerReporter. This argument updates the default report format specifications.
save_streams_on (Sequence[str], optional) – On which events to invoke
save_streams()
during work. Should be a sequence containing one or both of the keyson_monitor_update
,periodic_callback
.nonzero_exit_handling (str, optional) – How to react if a WorkerTask exits with a non-zero exit code. For ‘ignore’, nothing happens. For ‘warn’, a warning is printed and the last 5 lines of the log are shown. For ‘raise’, the last 20 lines of the log is shown, all other tasks are terminated, and the WorkerManager exits with the same exit code as the WorkerTask exited with. Note that ‘warn’ will not lead to any messages if the worker died by SIGTERM, which presumable originated from a fulfilled stop condition. Use ‘warn_all’ to also receive warnings in this case.
interrupt_params (dict, optional) –
Parameters that determine how the WorkerManager behaves when receiving KeyboardInterrupts during working. Possible keys:
send_signal
: Which signal to send to the workers. Canbe SIGINT (default), SIGTERM, SIGKILL, or any valid signal as integer.
grace_period
: how long to wait for the other workers togracefully shut down. After this period (in seconds), the workers will be killed via SIGKILL. Default is 5s.
exit
: whether to sys.exit at the end of start_working.Default is True.
cluster_mode (bool, optional) – Whether similar tasks to those that are managed by this WorkerManager are, at the same time, worked on by other WorkerManager. This is relevant because the output of files might be affected by whether another WorkerManager instance is currently working on the same output directory. Also, in the future, this argument might be used to communicate between nodes.
resolved_cluster_params (dict, optional) – The corresponding cluster parameters.
- Raises
ValueError – For too negative
num_workers
argument
- pending_exceptions: queue.Queue = None#
A (FiFo) queue of
Exception
objects that will be handled by theWorkerManager
during working. This is the interface that allows for other threads that have access to the WorkerManager to add an exception and let it be handled in the main thread.
- rf_spec: dict = None#
The report format specifications that are used throughout the WorkerManager. These are invoked at different points of the operation:
while_working
after_work
after_abort
task_spawn
task_finished
- property tasks: utopya.task.TaskList#
The list of all tasks.
- property task_queue: queue.Queue#
The task queue.
- property task_count: int#
Returns the number of tasks that this manager ever took care of. Careful: This is NOT the current number of tasks in the queue!
- property active_tasks: List[utopya.task.WorkerTask]#
The list of currently active tasks.
Note that this information might not be up-to-date; a process might quit just after the list has been updated.
- property num_finished_tasks: int#
The number of finished tasks. Incremented whenever a task leaves the active_tasks list, regardless of its exit status.
- property stop_conditions: Set[utopya.stop_conditions.StopCondition]#
All stop conditions that were ever passed to
start_working()
during the life time of this WorkerManager.
- property nonzero_exit_handling: str#
Behavior upon a worker exiting with a non-zero exit code.
with
ignore
, nothing happenswith
warn
, a warning is printedwith
raise
, the log is shown and the WorkerManager exits with the same exit code as the correspondingWorkerTask
exited with.
- property reporter: Optional[utopya.reporter.WorkerManagerReporter]#
The associated
WorkerManagerReporter
or None, if no reporter is set.
- property resolved_cluster_params: dict#
Returns a copy of the cluster configuration with all parameters resolved (thus making some additional keys available on the top level). It is returned as a deep copy to avoid mutability issues.
- add_task(*, TaskCls: type = <class 'utopya.task.WorkerTask'>, **task_kwargs) utopya.task.WorkerTask [source]#
Adds a task to the WorkerManager.
- Parameters
TaskCls (type, optional) – The WorkerTask-like type to use
**task_kwargs – All arguments needed for WorkerTask initialization. See
utopya.task.WorkerTask
for all valid arguments.
- Returns
The created WorkerTask object
- Return type
- start_working(*, detach: bool = False, timeout: Optional[float] = None, stop_conditions: Optional[Sequence[utopya.stop_conditions.StopCondition]] = None, post_poll_func: Optional[Callable] = None) None [source]#
Upon call, all enqueued tasks will be worked on sequentially.
- Parameters
detach (bool, optional) – If False (default), the WorkerManager will block here, as it continuously polls the workers and distributes tasks.
timeout (float, optional) – If given, the number of seconds this work session is allowed to take. Workers will be aborted if the number is exceeded. Note that this is not measured in CPU time, but the host systems wall time.
stop_conditions (Sequence[StopCondition], optional) – During the run these StopCondition objects will be checked
post_poll_func (Callable, optional) – If given, this is called after all workers have been polled. It can be used to perform custom actions during a the polling loop.
- Raises
NotImplementedError – if
detach
was setValueError – For invalid (i.e., negative) timeout value
WorkerManagerTotalTimeout – Upon a total timeout
- _invoke_report(rf_spec_name: str, *args, **kwargs)[source]#
Helper function to invoke the reporter’s report function
- _grab_task() utopya.task.WorkerTask [source]#
Will initiate that a task is gotten from the queue and that it spawns its worker process.
- Returns
The WorkerTask grabbed from the queue.
- Return type
- Raises
queue.Empty – If the task queue was empty
- _poll_workers() None [source]#
Will poll all workers that are in the working list and remove them from that list if they are no longer alive.
- _check_stop_conds(stop_conds: Sequence[utopya.stop_conditions.StopCondition]) Set[utopya.task.WorkerTask] [source]#
Checks the given stop conditions for the active tasks and compiles a list of tasks that needs to be terminated.
- Parameters
stop_conds (Sequence[StopCondition]) – The stop conditions that are to be checked.
- Returns
- The WorkerTasks whose workers need to be
terminated
- Return type
List[WorkerTask]
- _signal_workers(tasks: Union[str, List[utopya.task.WorkerTask]], *, signal: Union[str, int]) None [source]#
Send signals to a list of WorkerTasks.
- Parameters
tasks (Union[str, List[WorkerTask]]) – strings ‘all’ or ‘active’ or a list of WorkerTasks to signal
- _handle_pending_exceptions() None [source]#
This method handles the list of pending exceptions during working, starting from the one added most recently.
As the WorkerManager occupies the main thread, it is difficult for other threads to signal to the WorkerManager that an exception occurred. The pending_exceptions attribute allows such a handling; child threads can just add an exception object to it and they get handled during working of the WorkerManager.
This method handles the following exception types in a specific manner:
WorkerTaskStopConditionFulfilled
: never raising or loggingWorkerTaskNonZeroExit
: raising or logging depending on the value of thenonzero_exit_handling
property
- Returns
None
- Raises
Exception – The exception that was added first to the queue of pending exceptions
utopya.yaml module#
Takes care of the YAML setup for Utopya.
In the module import order, this module needs to be downstream from all modules that implement objects that require a custom YAML representation.
- utopya.yaml._parameter_shorthand_constructor(loader, node) utopya.parameter.Parameter [source]#
Constructs a Parameter object from a scalar YAML node using
scalar_node_to_object()
.The YAML tag is used as shorthand
mode
argument to thefrom_shorthand
class method.