Source code for utopya.eval.plots._attractor

"""This module holds data processing functionality that is used in the plot
functions.
"""

import logging
import operator
import warnings
from typing import List, Tuple, Union

import numpy as np
import xarray as xr
from dantro.base import BaseDataGroup
from scipy.signal import find_peaks

log = logging.getLogger(__name__)


# Data Analysis ---------------------------------------------------------------


[docs]def find_endpoint( data: xr.DataArray, *, time: int = -1, **kwargs ) -> Tuple[bool, xr.DataArray]: """Find the endpoint of a dataset wrt. ``time`` coordinate. This function is compatible with the :py:func:`utopya.eval.plots.attractor.bifurcation_diagram`. Arguments: data (xarray.DataArray): The data time (int, optional): The time index to select **kwargs: Passed on to :py:meth:`xarray.DataArray.isel` call Returns: Tuple[bool, xarray.DataArray]: The result in the form of a 2-tuple ``(endpoint found, endpoint)`` """ return True, data.isel(time=time, **kwargs)
[docs]def find_fixpoint( data: xr.Dataset, *, spin_up_time: int = 0, abs_std: float = None, rel_std: float = None, mean_kwargs=None, std_kwargs=None, isclose_kwargs=None, squeeze: bool = True, ) -> Tuple[bool, float]: """Find the fixpoint(s) of a dataset and confirm it by standard deviation. For dimensions that are not ``time`` the fixpoints are compared and duplicates removed. This function is compatible with the :py:func:`utopya.eval.plots.attractor.bifurcation_diagram`. Arguments: data (xarray.Dataset): The data spin_up_time (int, optional): The first timestep included abs_std (float, optional): The maximal allowed absolute standard deviation rel_std (float, optional): The maximal allowed relative standard deviation mean_kwargs (dict, optional): Additional keyword arguments passed on to the appropriate array function for calculating mean on data. std_kwargs (dict, optional): Additional keyword arguments passed on to the appropriate array function for calculating std on data. isclose_kwargs (dict, optional): Additional keyword arguments passed on to the appropriate array function for calculating np.isclose for fixpoint-duplicates across dimensions other than 'time'. squeeze (bool, optional): Use the data.squeeze method to remove dimensions of length one. Default is True. Returns: tuple: (fixpoint found, mean) """ if squeeze: data = data.squeeze() if len(data.dims) > 2: raise ValueError( "Method 'find_fixpoint' cannot handle data with more than 2 " f"dimensions. Data has dims {data.dims}" ) if spin_up_time > data.time[-1]: raise ValueError( "Spin up time was chosen larger than actual simulation time in " f"module find_fixpoint. Was {spin_up_time}, but simulation time " f"was {data.time.data[-1]}." ) # Get the data data = data.where(data.time >= spin_up_time, drop=True) # Calculate mean and std mean = data.mean(dim="time", **(mean_kwargs if mean_kwargs else {})) std = data.std(dim="time", **(std_kwargs if std_kwargs else {})) # Apply some masking, if parameters are given if abs_std is not None: mean = mean.where(std < abs_std) if rel_std is not None: mean = mean.where(std / mean < rel_std) conclusive = True for data_var_name, data_var in mean.data_vars.items(): if data_var.shape: for i, val in enumerate(data_var[:-1]): mask = np.isclose( val, data_var[i + 1 :], **(isclose_kwargs if isclose_kwargs else {}), ) data_var[i + 1 :][mask] = np.nan conclusive = conclusive and (np.count_nonzero(~np.isnan(data_var)) > 0) return conclusive, mean
[docs]def find_multistability(*args, **kwargs) -> Tuple[bool, float]: """Find the multistabilities of a dataset. Invokes :py:func:`.find_fixpoint`. This function is conclusive if :py:func:`.find_fixpoint` is conclusive with multiple entries. Args: *args: passed to :py:func:`.find_fixpoint` **kwargs: passed to :py:func:`.find_fixpoint` Returns Tuple[bool, float]: ``(multistability found, mean value)`` """ conclusive, mean = find_fixpoint(*args, **kwargs) if not conclusive: return conclusive, mean for data_var_name, data_var in mean.data_vars.items(): # Conclusive only if there are at least two non-nan values. # Count the non-zero entries of the inverse of boolian array np.isnan. # Need negation operator for that. if np.count_nonzero(~np.isnan(data_var)) > 1: return True, mean return False, mean
[docs]def find_oscillation( data: xr.Dataset, *, spin_up_time: int = 0, squeeze: bool = True, **find_peak_kwargs, ) -> Tuple[bool, list]: """Find oscillations in a dataset. This function is compatible with the :py:func:`utopya.eval.plots.attractor.bifurcation_diagram`. Arguments: data (xarray.Dataset): The data spin_up_time (int, optional): The first timestep included squeeze (bool, optional): Use the data.squeeze method to remove dimensions of length one. Default is True. **find_peak_kwargs: Passed on to :py:func:`scipy.signal.find_peaks`. If not given, will set ``height`` kwarg to ``-1.e+15``. Returns: Tuple[bool, list]: (oscillation found, [min, max]) """ if squeeze: data = data.squeeze() if len(data.dims) > 1: raise ValueError( "Method 'find_oscillation' cannot handle data with more than 1 " f"dimension. Data has dims {data.dims}" ) if spin_up_time > data.time[-1]: raise ValueError( "Spin up time was chosen larger than actual simulation time in " f"module find_oscillation. Was {spin_up_time}, but simulation " f"time was {data.time.data[-1]}." ) # Only use the data after spin up time data = data.where(data.time >= spin_up_time, drop=True) coords = {k: v for k, v in data.coords.items()} coords.pop("time", None) coords["osc"] = ["min", "max"] attractor = xr.Dataset(coords=coords, attrs={"conclusive": False}) if not find_peak_kwargs.get("height"): find_peak_kwargs["height"] = -1e15 for data_var_name, data_var in data.items(): maxima, max_props = find_peaks(data_var, **find_peak_kwargs) amax = np.amax(data_var) minima, min_props = find_peaks(amax - data_var, **find_peak_kwargs) if not maxima.size or not minima.size: mean = data_var.mean(dim="time") attractor[data_var_name] = ("osc", [mean, mean]) else: # Build (min, max) pair min_max = [ amax - min_props["peak_heights"][-1], max_props["peak_heights"][-1], ] attractor[data_var_name] = ("osc", min_max) # at least one data_var performs oscillations attractor.attrs["conclusive"] = True if attractor.attrs["conclusive"]: return True, attractor return False, attractor