colosseum.mdp.base

   1import abc
   2import random
   3import sys
   4from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, Type, Union
   5
   6import dm_env
   7import networkx as nx
   8import numpy as np
   9import yaml
  10from dm_env.specs import BoundedArray, Array
  11from pydtmc import MarkovChain
  12from scipy.stats import rv_continuous
  13
  14from colosseum import config
  15from colosseum.dynamic_programming import discounted_value_iteration
  16from colosseum.dynamic_programming.infinite_horizon import discounted_policy_evaluation
  17from colosseum.dynamic_programming.utils import get_policy_from_q_values
  18from colosseum.emission_maps import EmissionMap
  19from colosseum.emission_maps import Tabular
  20from colosseum.hardness.measures import calculate_norm_average
  21from colosseum.hardness.measures import calculate_norm_discounted
  22from colosseum.hardness.measures import find_hardness_report_file
  23from colosseum.hardness.measures import get_diameter
  24from colosseum.hardness.measures import get_sum_reciprocals_suboptimality_gaps
  25from colosseum.mdp.utils.communication_class import MDPCommunicationClass
  26from colosseum.mdp.utils.communication_class import get_communication_class
  27from colosseum.mdp.utils.communication_class import get_recurrent_nodes_set
  28from colosseum.mdp.utils.custom_samplers import NextStateSampler
  29from colosseum.mdp.utils.markov_chain import get_average_rewards, get_markov_chain
  30from colosseum.mdp.utils.markov_chain import get_stationary_distribution
  31from colosseum.mdp.utils.markov_chain import get_transition_probabilities
  32from colosseum.mdp.utils.mdp_creation import NodeInfoClass
  33from colosseum.mdp.utils.mdp_creation import get_transition_matrix_and_rewards
  34from colosseum.mdp.utils.mdp_creation import instantiate_transitions
  35from colosseum.utils import clean_for_storing
  36from colosseum.utils.acme.specs import DiscreteArray
  37from colosseum.utils.formatter import clean_for_file_path
  38
  39if TYPE_CHECKING:
  40    from colosseum.mdp import ACTION_TYPE, NODE_TYPE, ContinuousMDP, EpisodicMDP
  41
  42sys.setrecursionlimit(5000)
  43
  44
  45class BaseMDP(dm_env.Environment, abc.ABC):
  46    """
  47    The base class for MDPs.
  48    """
  49
  50    @staticmethod
  51    @abc.abstractmethod
  52    def get_unique_symbols() -> List[str]:
  53        """
  54        Returns
  55        -------
  56        List[str]
  57            the unique symbols of the grid representation of the MDP.
  58        """
  59
  60    @staticmethod
  61    def get_available_hardness_measures() -> List[str]:
  62        """
  63        Returns
  64        -------
  65        List[str]
  66            The hardness measures available in the package.
  67        """
  68        return ["diameter", "value_norm", "suboptimal_gaps"]
  69
  70    @staticmethod
  71    def produce_gin_file_from_mdp_parameters(
  72        parameters: Dict[str, Any], mdp_class_name: str, index: int = 0
  73    ) -> str:
  74        """
  75        Parameters
  76        ----------
  77        parameters : Dict[str, Any]
  78            The parameters of the MDP.
  79        mdp_class_name : str
  80            The name of the class of the MDP.
  81        index : int
  82            The index for the gin configuration
  83
  84        Returns
  85        -------
  86        str
  87            The gin configuration for the given parameters and index.
  88        """
  89
  90        string = ""
  91        for k, v in parameters.items():
  92            string += f"prms_{index}/{mdp_class_name}.{k} = {v}\n"
  93        return string
  94
  95    @staticmethod
  96    @abc.abstractmethod
  97    def does_seed_change_MDP_structure() -> bool:
  98        """
  99        Returns
 100        -------
 101        bool
 102            True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
 103        happen when there are fewer starting states that possible one and the effective starting states are picked
 104        randomly based on the seed.
 105        """
 106
 107    @staticmethod
 108    @abc.abstractmethod
 109    def is_episodic() -> bool:
 110        """
 111        Returns
 112        -------
 113        bool
 114            True if the MDP is episodic.
 115        """
 116
 117    @staticmethod
 118    @abc.abstractmethod
 119    def sample_parameters(n: int, seed: int = None) -> List[Dict[str, Any]]:
 120        """
 121        Returns
 122        -------
 123        List[Dict[str, Any]]
 124            n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
 125        """
 126
 127    @staticmethod
 128    @abc.abstractmethod
 129    def sample_mdp_parameters(
 130        n: int, is_episodic: bool, seed: int = None
 131    ) -> List[Dict[str, Any]]:
 132        """
 133        Returns
 134        -------
 135        List[Dict[str, Any]]
 136            n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
 137        """
 138
 139    @staticmethod
 140    @abc.abstractmethod
 141    def get_node_class() -> Type["NODE_TYPE"]:
 142        """
 143        Returns
 144        -------
 145        Type["NODE_TYPE"]
 146            The class of the nodes of the MDP.
 147        """
 148
 149    @property
 150    @abc.abstractmethod
 151    def n_actions(self) -> int:
 152        """
 153        Returns
 154        -------
 155        int
 156            The number of available actions.
 157        """
 158
 159    @abc.abstractmethod
 160    def _get_next_nodes_parameters(
 161        self, node: "NODE_TYPE", action: "ACTION_TYPE"
 162    ) -> Tuple[Tuple[dict, float], ...]:
 163        """
 164        Returns
 165        -------
 166        Tuple[Tuple[dict, float], ...]
 167            The parameters of the possible next nodes reachable from the given state and action.
 168        """
 169
 170    @abc.abstractmethod
 171    def _get_reward_distribution(
 172        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
 173    ) -> rv_continuous:
 174        """
 175        Returns
 176        -------
 177        rv_continuous
 178            The distribution over rewards in the zero one interval when transitioning from a given state and action
 179        to a given next state. Note that rescaling the rewards to a different range is handled separately.
 180        """
 181
 182    @abc.abstractmethod
 183    def _get_starting_node_sampler(self) -> NextStateSampler:
 184        """
 185        Returns
 186        -------
 187        NextStateSampler
 188            The sampler over next states that corresponds to the starting state distribution.
 189        """
 190
 191    @abc.abstractmethod
 192    def _check_parameters_in_input(self):
 193        """
 194        checks whether the parameters given in input can produce a correct MDP instance.
 195        """
 196        assert self._p_rand is None or (0 < self._p_rand < 0.9999)
 197        assert self._p_lazy is None or (0 < self._p_lazy < 0.9999)
 198
 199    @abc.abstractmethod
 200    def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.ndarray:
 201        """
 202        Returns
 203        -------
 204        np.ndarray
 205            An ASCII representation of the state given in input stored as numpy array.
 206        """
 207
 208    @abc.abstractmethod
 209    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
 210        """
 211        Returns
 212        -------
 213        np.ndarray
 214            An ASCII representation of the state given in input stored as numpy array.
 215        """
 216
 217    @property
 218    @abc.abstractmethod
 219    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
 220        """
 221        Returns
 222        -------
 223        List["NODE_TYPE"]
 224            The list containing all the possible starting state for this MDP instance.
 225        """
 226
 227    @property
 228    @abc.abstractmethod
 229    def parameters(self) -> Dict[str, Any]:
 230        """
 231        Returns
 232        -------
 233        Dict[str, Any]
 234            The parameters of the MDP.
 235        """
 236        return dict(
 237            seed=self._seed,
 238            randomize_actions=self._randomize_actions,
 239            p_lazy=self._p_lazy,
 240            p_rand=self._p_rand,
 241            rewards_range=self._rewards_range,
 242            make_reward_stochastic=self._make_reward_stochastic,
 243            reward_variance_multiplier=self._reward_variance_multiplier,
 244        )
 245
 246    @abc.abstractmethod
 247    def get_gin_parameters(self, index: int) -> str:
 248        """
 249        Returns
 250        -------
 251        str
 252            The gin config of the MDP instance.
 253        """
 254
 255    def get_gin_config(self, index: int) -> str:
 256        """
 257        Returns
 258        -------
 259        str
 260            The gin config file of the MDP.
 261        """
 262        return "".join(self.get_gin_parameters(index))
 263
 264    def get_node_labels(self, l: List[Any]) -> Dict["NODE_TYPE", Any]:
 265        """
 266        Returns
 267        -------
 268        Dict["NODE_TYPE", Any]
 269            A dictionary that assigns to each node the value of the list in the corresponding index.
 270        """
 271        assert len(l) == self.n_states, (
 272            f"The array in input should have the same dimensionality of the number of states {self.n_states}, "
 273            f" {len(l)} long array received instead."
 274        )
 275        return {self.index_to_node[i]: l[i] for i in range(len(l))}
 276
 277    def get_node_action_labels(self, l: List[List[Any]]) -> Dict["NODE_TYPE", float]:
 278        """
 279        Returns
 280        -------
 281        Dict["NODE_TYPE", Any]
 282            A dictionary that assigns to each node and action the value of the list in the corresponding index.
 283        """
 284        assert len(l) == self.n_states, (
 285            f"The list in input should be the same length of the number of states {self.n_states}, "
 286            f" list of length {len(l)} received instead."
 287        )
 288        assert all(
 289            len(ll) == self.n_actions for ll in l
 290        ), f"The lists inside the lists should be the same length as the number of actions {self.n_actions}, "
 291        return {
 292            (self.index_to_node[i], action): l[i][action]
 293            for i in range(len(l))
 294            for action in range(self.n_actions)
 295        }
 296
 297    @property
 298    def hash(self) -> str:
 299        """
 300        Returns
 301        -------
 302        str
 303            The hash value based on the parameters of the MDP. This can be used to create cache files.
 304        """
 305        s = "_".join(map(str, clean_for_storing(list(self.parameters.values()))))
 306        return f"mdp_{type(self).__name__}_" + clean_for_file_path(s)
 307
 308    def __str__(self) -> str:
 309        """
 310        Returns
 311        -------
 312        str
 313            The string containing all the information about the MDP including parameters, measures of hardness and
 314        graph metrics.
 315        """
 316        string = type(self).__name__ + "\n"
 317        m_l = 0
 318        for k, v in self.summary.items():
 319            m_l = max(m_l, len(max(v.keys(), key=len)) + 4)
 320        for k, v in self.summary.items():
 321            string += "\t" + k + "\n"
 322            for kk, vv in v.items():
 323                string += f"\t\t{kk}{' ' * (m_l - len(kk))}:\t{vv}\n"
 324        return string
 325
 326    @abc.abstractmethod
 327    def __init__(
 328        self: Union["EpisodicMDP", "ContinuousMDP"],
 329        seed: int,
 330        randomize_actions: bool = True,
 331        make_reward_stochastic=False,
 332        reward_variance_multiplier: float = 1.0,
 333        p_lazy: float = None,
 334        p_rand: float = None,
 335        rewards_range: Tuple[float, float] = (0.0, 1.0),
 336        emission_map: Type[EmissionMap] = Tabular,
 337        emission_map_kwargs: Dict[str, Any] = dict(),
 338        noise: Type[EmissionMap] = None,
 339        noise_kwargs: Dict[str, Any] = dict(),
 340        instantiate_mdp: bool = True,
 341        force_sparse_transition: bool = False,
 342        exclude_horizon_from_parameters: bool = False,
 343    ):
 344        """
 345        Parameters
 346        ----------
 347        seed : int
 348            The random seed.
 349        randomize_actions : bool
 350            If True, the outcome of action at different states is randomized. By default, it is set to True.
 351        make_reward_stochastic : bool
 352            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
 353        reward_variance_multiplier : float
 354            A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the
 355             variance. By default, it is set to 1.
 356        p_lazy : float
 357            The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero.
 358        p_rand : float
 359            The probability of selecting an action at random instead of the one specified by the agent. By default, it
 360            is set to zero.
 361        rewards_range : Tuple[float, float]
 362            The maximum range of values for the rewards. By default, it is `(0, 1)`.
 363        emission_map : Type[EmissionMap]
 364            The emission map that renders the MPD non-tabular. By default, no emission map is used.
 365        emission_map_kwargs : Dict[str, Any]
 366            The keyword arguments for the emission map.
 367        noise : Type[EmissionMap]
 368            The noise class to make the emission map stochastic. By default, the emission map is left deterministic.
 369        noise_kwargs : Dict[str, Any]
 370            The keyword arguments for the noise.
 371        instantiate_mdp : bool
 372            If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True.
 373        force_sparse_transition : bool
 374            If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not
 375            enforced.
 376        exclude_horizon_from_parameters : bool
 377            If True, in the episodic case, the horizon is not considered a parameter when computing the `parameters`
 378            properties and the hash. By default, it is not excluded.
 379        """
 380
 381        # MDP generic variables
 382        self._seed = seed
 383        self._randomize_actions = randomize_actions
 384        self._make_reward_stochastic = make_reward_stochastic
 385        self._reward_variance_multiplier = reward_variance_multiplier
 386        self._hardness_reports_folder = config.get_hardness_measures_cache_folder()
 387        self._force_sparse_transition = force_sparse_transition
 388        self._exclude_horizon_from_parameters = exclude_horizon_from_parameters
 389        self._p_rand = p_rand if p_rand is None or p_rand > 0.0 else None
 390        self._p_lazy = p_lazy if p_lazy is None or p_lazy > 0.0 else None
 391        self.rewards_range = self._rewards_range = (
 392            rewards_range
 393            if rewards_range[0] < rewards_range[1]
 394            else rewards_range[::-1]
 395        )
 396        self._are_all_rewards_deterministic = True
 397        self._are_all_transition_deterministic = True
 398        self.r_min, self.r_max = self.rewards_range
 399
 400        # MDP loop variables
 401        self._hr = None
 402        self.cur_node = None
 403        self.last_edge = None
 404        self.last_starting_node = None
 405        self.is_reset_necessary = True
 406        self.current_timestep = 0
 407        self.h = 0
 408        self._rng = np.random.RandomState(seed)
 409        self._fast_rng = random.Random(seed)
 410
 411        # Caching variables. Note that we cannot use lru_cache because it prevents from deleting the objects.
 412        self._cached_rewards = dict()
 413        self._cached_reward_distributions = dict()
 414        self._action_mapping = dict()
 415        self._communication_class = None
 416        self._recurrent_nodes_set = None
 417        if not hasattr(self, "_transition_matrix_and_rewards"):
 418            self._transition_matrix_and_rewards = None
 419        self._graph_layout = None
 420        self._graph_metrics = None
 421        self._summary = None
 422        self._diameter = None
 423        self._sum_reciprocals_suboptimality_gaps = None
 424        self._optimal_value_norm = dict()
 425        self._optimal_value = None
 426        self._worst_value = None
 427        self._random_value = None
 428        self._optimal_policy = dict()
 429        self._worst_policy = dict()
 430        self._otp = None
 431        self._omc = None
 432        self._osd = None
 433        self._oars = None
 434        self._oar = None
 435        self._wtp = None
 436        self._wmc = None
 437        self._wsd = None
 438        self._wars = None
 439        self._war = None
 440        self._rtp = None
 441        self._rmc = None
 442        self._rsd = None
 443        self._rars = None
 444        self._rar = None
 445
 446        if instantiate_mdp:
 447            self.instantiate_MDP()
 448
 449        # Emission map
 450        if emission_map == Tabular or emission_map is None:
 451            self.emission_map = None
 452            self.is_tabular = True
 453        else:
 454            noise_kwargs["seed"] = seed
 455            self.emission_map = emission_map(
 456                self,
 457                noise_class=noise,
 458                noise_kwargs=noise_kwargs,
 459                **emission_map_kwargs,
 460            )
 461            self.is_tabular = self.emission_map.is_tabular
 462
 463    def instantiate_MDP(self):
 464        """
 465        recurisively instantiate the MDP.
 466        """
 467        # Instantiating the MDP
 468        self._check_parameters_in_input()
 469        self._starting_node_sampler = self._get_starting_node_sampler()
 470        self.starting_nodes = self._starting_node_sampler.next_nodes
 471        self.G = nx.DiGraph()
 472        self._instantiate_mdp()
 473        self.n_states = len(self.G.nodes)
 474
 475        # Some shortcuts variables
 476        if not self.is_episodic():
 477            self._vi, self._pe = (
 478                discounted_value_iteration,
 479                discounted_policy_evaluation,
 480            )
 481            self.random_policy = (
 482                np.ones((self.n_states, self.n_actions), dtype=np.float32)
 483                / self.n_actions
 484            )
 485
 486        # Cache state to index mapping
 487        mapping = self._rng.rand(self.n_states, self.n_actions).argsort(1)
 488        self.node_to_index = dict()
 489        self.index_to_node = dict()
 490        for i, node in enumerate(self.G.nodes):
 491            self.node_to_index[node] = i
 492            self.index_to_node[i] = node
 493
 494        # Compute the starting state distribution
 495        self.starting_state_distribution = np.zeros(self.n_states)
 496        self.starting_states = []
 497        for n, p in self._starting_node_sampler.next_nodes_and_probs:
 498            s = self.node_to_index[n]
 499            self.starting_state_distribution[s] = p
 500            self.starting_states.append(s)
 501        self.starting_states_and_probs = list(
 502            zip(self.starting_states, self._starting_node_sampler.probs)
 503        )
 504
 505    def _get_action_mapping(self, node: "NODE_TYPE") -> Tuple["ACTION_TYPE", ...]:
 506        """
 507        Returns
 508        -------
 509        Tuple["ACTION_TYPE", ...]
 510            The random action mapping for the state given in input.
 511        """
 512        if node not in self._action_mapping:
 513            self._action_mapping[node] = (
 514                self._rng.rand(self.n_actions).argsort().tolist()
 515                if self._randomize_actions
 516                else list(range(self.n_actions))
 517            )
 518        return self._action_mapping[node]
 519
 520    def _inverse_action_mapping(
 521        self, node: "NODE_TYPE", action: "ACTION_TYPE"
 522    ) -> "ACTION_TYPE":
 523        """
 524        Returns
 525        -------
 526        ACTION_TYPE
 527            The effective action corresponding to the action given in input in the given state. In other words, it
 528        reverses the random action mapping.
 529        """
 530        return self._get_action_mapping(node)[action]
 531
 532    def _produce_random_seed(self) -> int:
 533        """
 534        Returns
 535        -------
 536        int
 537            A new random seed that can be used for internal objects.
 538        """
 539        return self._fast_rng.randint(0, 10_000)
 540
 541    def _instantiate_mdp(self):
 542        """
 543        recursively instantiate the MDP.
 544        """
 545        for sn in self.starting_nodes:
 546            instantiate_transitions(self, sn)
 547
 548    @property
 549    def T(self) -> np.ndarray:
 550        """
 551        is an alias for the transition matrix.
 552        """
 553        return self.transition_matrix_and_rewards[0]
 554
 555    @property
 556    def R(self) -> np.ndarray:
 557        """
 558        is an alias for the rewards matrix.
 559        """
 560        return self.transition_matrix_and_rewards[1]
 561
 562    @property
 563    def recurrent_nodes_set(self) -> Iterable["NODE_TYPE"]:
 564        """
 565        Returns
 566        -------
 567        Iterable["NODE_TYPE"]
 568            The recurrent states set.
 569        """
 570        if self._recurrent_nodes_set is None:
 571            self._recurrent_nodes_set = get_recurrent_nodes_set(
 572                self.communication_class, self.G
 573            )
 574        return self._recurrent_nodes_set
 575
 576    @property
 577    def communication_class(
 578        self: Union["EpisodicMDP", "ContinuousMDP"]
 579    ) -> MDPCommunicationClass:
 580        """
 581        Returns
 582        -------
 583        MDPCommunicationClass
 584            The communication class.
 585        """
 586        if self._communication_class is None:
 587            self._communication_class = get_communication_class(
 588                self.T, self.get_episodic_graph(True) if self.is_episodic() else self.G
 589            )
 590        return self._communication_class
 591
 592    def get_optimal_policy(self, stochastic_form: bool) -> np.ndarray:
 593        """
 594        Returns
 595        -------
 596        np.ndarray
 597            The optimal policy. It can be either in the stochastic form, so in the form of dirac probability
 598        distribution or as a simple mapping to integer.
 599        """
 600        if stochastic_form not in self._optimal_policy:
 601            self._optimal_policy[stochastic_form] = get_policy_from_q_values(
 602                self.optimal_value_functions[0], stochastic_form
 603            )
 604        return self._optimal_policy[stochastic_form]
 605
 606    def get_worst_policy(self, stochastic_form) -> np.ndarray:
 607        """
 608        Returns
 609        -------
 610        np.ndarray
 611            The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability
 612        distribution or as a simple mapping to integer.
 613        """
 614        if stochastic_form not in self._worst_policy:
 615            self._worst_policy[stochastic_form] = get_policy_from_q_values(
 616                self._vi(self.T, -self.R)[0], stochastic_form
 617            )
 618        return self._worst_policy[stochastic_form]
 619
 620    def get_value_functions(self, policy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
 621        """
 622        Parameters
 623        ----------
 624        policy : np.ndarray
 625            a numpy array containing a continuous policy.
 626        Returns
 627        -------
 628        Q : np.ndarray
 629            the state action value function.
 630        V : np.ndarray
 631            the state value function.
 632        """
 633        return self._pe(*self.transition_matrix_and_rewards, policy)
 634
 635    @property
 636    def optimal_value_functions(self) -> Tuple[np.ndarray, np.ndarray]:
 637        """
 638        Returns
 639        -------
 640        Q : np.ndarray
 641            the optimal state action value function.
 642        V : np.ndarray
 643            the optimal state value function.
 644        """
 645        if self._optimal_value is None:
 646            self._optimal_value = self._vi(*self.transition_matrix_and_rewards)
 647        return self._optimal_value
 648
 649    @property
 650    def worst_value_functions(self) -> Tuple[np.ndarray, np.ndarray]:
 651        """
 652        Returns
 653        -------
 654        Q : np.ndarray
 655            the state action value function of the worst policy.
 656        V : np.ndarray
 657            the state value function of the worst policy.
 658        """
 659        if self._worst_value is None:
 660            self._worst_value = self._pe(
 661                *self.transition_matrix_and_rewards, self.get_worst_policy(True)
 662            )
 663        return self._worst_value
 664
 665    @property
 666    def random_value_functions(self) -> Tuple[np.ndarray, np.ndarray]:
 667        """
 668        Returns
 669        -------
 670        Q : np.ndarray
 671            the state action value function of the random uniform policy.
 672        V : np.ndarray
 673            the state value function of the random uniform policy.
 674        """
 675        if self._random_value is None:
 676            self._random_value = self._pe(
 677                *self.transition_matrix_and_rewards, self.random_policy
 678            )
 679        return self._random_value
 680
 681    @property
 682    def optimal_transition_probabilities(self) -> np.ndarray:
 683        """
 684        Returns
 685        -------
 686        np.ndarray
 687            The transition probabilities corresponding to the optimal policy.
 688        """
 689        if self._otp is None:
 690            T = self.T_cf if self.is_episodic() else self.T
 691            pi = (
 692                self.get_optimal_policy_continuous_form(True)
 693                if self.is_episodic()
 694                else self.get_optimal_policy(True)
 695            )
 696            self._otp = get_transition_probabilities(T, pi)
 697        return self._otp
 698
 699    @property
 700    def worst_transition_probabilities(self) -> np.ndarray:
 701        """
 702        Returns
 703        -------
 704        np.ndarray
 705            The transition probabilities corresponding to the worst policy.
 706        """
 707        if self._wtp is None:
 708            T = self.T_cf if self.is_episodic() else self.T
 709            pi = (
 710                self.get_worst_policy_continuous_form(True)
 711                if self.is_episodic()
 712                else self.get_worst_policy(True)
 713            )
 714            self._wtp = get_transition_probabilities(T, pi)
 715        return self._wtp
 716
 717    @property
 718    def random_transition_probabilities(self) -> np.ndarray:
 719        """
 720        Returns
 721        -------
 722        np.ndarray
 723            The transition probabilities corresponding to the random uniform policy.
 724        """
 725        if self._rtp is None:
 726            T = self.T_cf if self.is_episodic() else self.T
 727            pi = self.random_policy_cf if self.is_episodic() else self.random_policy
 728            self._rtp = get_transition_probabilities(T, pi)
 729        return self._rtp
 730
 731    @property
 732    def optimal_markov_chain(self) -> MarkovChain:
 733        """
 734        Returns
 735        -------
 736        MarkovChain
 737            The Markov chain corresponding to the optimal policy.
 738        """
 739        if self._omc is None:
 740            self._omc = get_markov_chain(self.optimal_transition_probabilities)
 741        return self._omc
 742
 743    @property
 744    def worst_markov_chain(self) -> MarkovChain:
 745        """
 746        Returns
 747        -------
 748        MarkovChain
 749            The Markov chain corresponding to the worst policy.
 750        """
 751        if self._wmc is None:
 752            self._wmc = get_markov_chain(self.worst_transition_probabilities)
 753        return self._wmc
 754
 755    @property
 756    def random_markov_chain(self) -> MarkovChain:
 757        """
 758        Returns
 759        -------
 760        MarkovChain
 761            The Markov chain corresponding to the random uniform policy.
 762        """
 763        if self._rmc is None:
 764            self._rmc = get_markov_chain(self.random_transition_probabilities)
 765        return self._rmc
 766
 767    def get_stationary_distribution(self, policy: np.ndarray) -> np.ndarray:
 768        """
 769        Parameters
 770        ----------
 771        policy : np.ndarray
 772            a numpy array containing a continuous policy.
 773        Returns
 774        -------
 775        np.ndarray
 776            the stationary distribution of the Markov chain yielded by the policy.
 777        """
 778        return get_stationary_distribution(
 779            get_transition_probabilities(self.T, policy),
 780            self.starting_states_and_probs,
 781        )
 782
 783    @property
 784    def optimal_stationary_distribution(self) -> np.array:
 785        """
 786        Returns
 787        -------
 788        np.ndarray
 789            the stationary distribution of the Markov chain yielded by the optimal policy.
 790        """
 791        if self._osd is None:
 792            self._osd = get_stationary_distribution(
 793                self.optimal_transition_probabilities,
 794                self.starting_states_and_probs,
 795            )
 796        return self._osd
 797
 798    @property
 799    def worst_stationary_distribution(self) -> np.array:
 800        """
 801        Returns
 802        -------
 803        np.ndarray
 804            the stationary distribution of the Markov chain yielded by the worst policy.
 805        """
 806        if self._wsd is None:
 807            self._wsd = get_stationary_distribution(
 808                self.worst_transition_probabilities,
 809                self.starting_states_and_probs,
 810            )
 811            assert not np.isnan(self._wsd).any()
 812        return self._wsd
 813
 814    @property
 815    def random_stationary_distribution(self) -> np.array:
 816        """
 817        Returns
 818        -------
 819        np.ndarray
 820            the stationary distribution of the Markov chain yielded by the random uniform policy.
 821        """
 822        if self._rsd is None:
 823            self._rsd = get_stationary_distribution(
 824                self.random_transition_probabilities,
 825                None,
 826            )
 827        return self._rsd
 828
 829    @property
 830    def optimal_average_rewards(self) -> np.ndarray:
 831        """
 832        Returns
 833        -------
 834        np.ndarray
 835            The average rewards obtained for each state when following the optimal policy.
 836        """
 837        if self._oars is None:
 838            R = self.R_cf if self.is_episodic() else self.R
 839            pi = (
 840                self.get_optimal_policy_continuous_form(True)
 841                if self.is_episodic()
 842                else self.get_optimal_policy(True)
 843            )
 844            self._oars = get_average_rewards(R, pi)
 845        return self._oars
 846
 847    @property
 848    def worst_average_rewards(self) -> np.ndarray:
 849        """
 850        Returns
 851        -------
 852        np.ndarray
 853            The average rewards obtained for each state when following the worst policy.
 854        """
 855        if self._wars is None:
 856            R = self.R_cf if self.is_episodic() else self.R
 857            pi = (
 858                self.get_worst_policy_continuous_form(True)
 859                if self.is_episodic()
 860                else self.get_worst_policy(True)
 861            )
 862            self._wars = get_average_rewards(R, pi)
 863        return self._wars
 864
 865    @property
 866    def random_average_rewards(self) -> np.ndarray:
 867        """
 868        Returns
 869        -------
 870        np.ndarray
 871            The average rewards obtained for each state when following the random uniform policy.
 872        """
 873        if self._rars is None:
 874            R = self.R_cf if self.is_episodic() else self.R
 875            pi = self.random_policy_cf if self.is_episodic() else self.random_policy
 876            self._rars = get_average_rewards(R, pi)
 877        return self._rars
 878
 879    def get_average_reward(self, policy: np.ndarray) -> float:
 880        r"""
 881        Parameters
 882        ----------
 883        policy : np.ndarray
 884            a numpy array containing a continuous policy.
 885        Returns
 886        -------
 887        average_reward : float
 888            the expected time average reward, i.e.
 889            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 890             the policy.
 891        """
 892        sd = self.get_stationary_distribution(policy)
 893        return sum(sd * get_average_rewards(self.R, policy))
 894
 895    @property
 896    def optimal_average_reward(self) -> float:
 897        r"""
 898        Returns
 899        -------
 900        average_reward : float
 901            the expected time average reward, i.e.
 902            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 903            the optimal policy.
 904        """
 905        if self._oar is None:
 906            self._oar = sum(
 907                self.optimal_stationary_distribution * self.optimal_average_rewards
 908            )
 909        return self._oar
 910
 911    @property
 912    def worst_average_reward(self) -> float:
 913        r"""
 914        Returns
 915        -------
 916        average_reward : float
 917            the expected time average reward, i.e.
 918            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 919            the worst policy.
 920        """
 921        if self._war is None:
 922            self._war = sum(
 923                self.worst_stationary_distribution * self.worst_average_rewards
 924            )
 925        return self._war
 926
 927    @property
 928    def random_average_reward(self) -> float:
 929        r"""
 930        Returns
 931        -------
 932        average_reward : float
 933            the expected time average reward, i.e.
 934            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 935            the random uniform policy.
 936        """
 937        if self._rar is None:
 938            self._rar = sum(
 939                self.random_stationary_distribution * self.random_average_rewards
 940            )
 941        return self._rar
 942
 943    @property
 944    def transition_matrix_and_rewards(self) -> Tuple[np.ndarray, np.ndarray]:
 945        """
 946        Returns
 947        -------
 948        np.ndarray
 949            The transition probabilities 3d numpy array.
 950        np.ndarray
 951            The reward matrix.
 952        """
 953        if self._transition_matrix_and_rewards is None:
 954            self._transition_matrix_and_rewards = get_transition_matrix_and_rewards(
 955                self.n_states,
 956                self.n_actions,
 957                self.G,
 958                self.get_info_class,
 959                self.get_reward_distribution,
 960                self.node_to_index,
 961                self._force_sparse_transition,
 962            )
 963        return self._transition_matrix_and_rewards
 964
 965    @property
 966    def graph_layout(self) -> Dict["NODE_TYPE", Tuple[float, float]]:
 967        """
 968        Returns
 969        -------
 970        Dict["NODE_TYPE", Tuple[float, float]]
 971            The graph layout for the MDP. It can be customized by implementing the `custom_graph_layout` function.
 972        """
 973        if self._graph_layout is None:
 974            self._graph_layout = (
 975                self.custom_graph_layout()
 976                if hasattr(self, "custom_graph_layout")
 977                else nx.nx_agraph.graphviz_layout(self.G)
 978            )
 979        return self._graph_layout
 980
 981    @property
 982    def graph_metrics(self) -> Dict[str, Any]:
 983        """
 984        Returns
 985        -------
 986        Dict[str, Any]
 987            The dictionary containing graph metrics for the graph underlying the MDP.
 988        """
 989        if self._graph_metrics is None:
 990            G = self.get_episodic_graph(True) if self.is_episodic() else self.G
 991            self._graph_metrics = dict()
 992            self._graph_metrics["# nodes"] = len(G.nodes)
 993            self._graph_metrics["# edges"] = len(G.edges)
 994        return self._graph_metrics
 995
 996    @property
 997    def diameter(self: Union["EpisodicMDP", "ContinuousMDP"]) -> float:
 998        """
 999        Returns
1000        -------
1001        float
1002            The diameter of the MDP.
1003        """
1004        if self._diameter is None:
1005            if self.hardness_report:
1006                self._diameter = self.hardness_report["MDP measure of hardness"][
1007                    "diameter"
1008                ]
1009            else:
1010                self._diameter = get_diameter(
1011                    self.episodic_transition_matrix_and_rewards[0]
1012                    if self.is_episodic()
1013                    else self.T,
1014                    self.is_episodic(),
1015                )
1016        return self._diameter
1017
1018    @property
1019    def sum_reciprocals_suboptimality_gaps(
1020        self: Union["EpisodicMDP", "ContinuousMDP"]
1021    ) -> float:
1022        """
1023        Returns
1024        -------
1025        float
1026            The sum of the reciprocals of the sub-optimality gaps
1027        """
1028        if self._sum_reciprocals_suboptimality_gaps is None:
1029            if self.hardness_report:
1030                self._sum_reciprocals_suboptimality_gaps = self.hardness_report[
1031                    "MDP measure of hardness"
1032                ]["suboptimal_gaps"]
1033            else:
1034                self._sum_reciprocals_suboptimality_gaps = (
1035                    get_sum_reciprocals_suboptimality_gaps(
1036                        *self.optimal_value_functions,
1037                        self.reachable_states if self.is_episodic() else None,
1038                    )
1039                )
1040        return self._sum_reciprocals_suboptimality_gaps
1041
1042    def _compute_value_norm(self, discounted: bool) -> float:
1043        """
1044        Returns
1045        -------
1046        float
1047            The environmental value norm in the undiscounted or undiscounted setting.
1048        """
1049        T, R = (self.T_cf, self.R_cf) if self.is_episodic() else (self.T, self.R)
1050        V = (
1051            self.optimal_value_continuous_form[1]
1052            if self.is_episodic()
1053            else self.optimal_value_functions[1]
1054        )
1055        if discounted:
1056            return calculate_norm_discounted(T, V)
1057        return calculate_norm_average(
1058            T, self.optimal_transition_probabilities, self.optimal_average_rewards
1059        )
1060
1061    @property
1062    def discounted_value_norm(self) -> float:
1063        """
1064        Returns
1065        -------
1066        float
1067            The environmental value norm in the discounted setting.
1068        """
1069        if True not in self._optimal_value_norm:
1070            if (
1071                self._are_all_transition_deterministic
1072                and self._are_all_rewards_deterministic
1073            ):
1074                self._optimal_value_norm[True] = 0.0
1075            elif self.hardness_report:
1076                self._optimal_value_norm[True] = self.hardness_report[
1077                    "MDP measure of hardness"
1078                ]["value_norm"]
1079            else:
1080                self._optimal_value_norm[True] = self._compute_value_norm(True)
1081        return self._optimal_value_norm[True]
1082
1083    @property
1084    def undiscounted_value_norm(self) -> float:
1085        """
1086        Returns
1087        -------
1088        float
1089            The environmental value norm in the undiscounted setting.
1090        """
1091        if False not in self._optimal_value_norm:
1092            self._optimal_value_norm[False] = self._compute_value_norm(False)
1093        return self._optimal_value_norm[False]
1094
1095    @property
1096    def value_norm(self):
1097        """
1098        is an alias for the discounted value norm.
1099        """
1100        return self.discounted_value_norm
1101
1102    @property
1103    def measures_of_hardness(self) -> Dict[str, float]:
1104        """
1105        Returns
1106        -------
1107        Dict[str, float]
1108            The dictionary containing all the measures of hardness availble.
1109        """
1110        return dict(
1111            diameter=self.diameter,
1112            suboptimal_gaps=self.sum_reciprocals_suboptimality_gaps,
1113            value_norm=self.value_norm,
1114        )
1115
1116    @property
1117    def summary(self) -> Dict[str, Dict[str, Any]]:
1118        """
1119        Returns
1120        -------
1121        Dict[str, Dict[str, Any]]
1122            The dictionary with information about the parameters, the measures of hardness and the graph metrics.
1123        """
1124        if self._summary is None:
1125            self._summary = {
1126                "Parameters": clean_for_storing(self.parameters),
1127                "Measure of hardness": clean_for_storing(self.measures_of_hardness),
1128                "Graph metrics": clean_for_storing(self.graph_metrics),
1129            }
1130        return self._summary
1131
1132    @property
1133    def hardness_report(self) -> Union[Dict, None]:
1134        """
1135        looks for a cached hardness report in the folder given in input when the MDP was created. It returns None if it
1136        is not able to find it.
1137        """
1138        if self._hr is None:
1139            report_file = find_hardness_report_file(self, self._hardness_reports_folder)
1140            if report_file:
1141                with open(report_file, "r") as f:
1142                    report = yaml.load(f, yaml.Loader)
1143                self._hr = report
1144            else:
1145                self._hr = False
1146        if self._hr:
1147            return self._hr
1148        return None
1149
1150    def get_info_class(self, n: "NODE_TYPE") -> NodeInfoClass:
1151        """
1152        Returns
1153        -------
1154        NodeInfoClass
1155            The container class (NodeInfoClass) associated with state n.
1156        """
1157        return self.G.nodes[n]["info_class"]
1158
1159    def get_transition_distributions(
1160        self, node: "NODE_TYPE"
1161    ) -> Dict[int, Union[rv_continuous, NextStateSampler]]:
1162        """
1163        Returns
1164        -------
1165        Dict[int, Union[rv_continuous, NextStateSampler]]
1166            The dictionary containing the transition distributions for any action at the state given in input.
1167        """
1168        return self.get_info_class(node).transition_distributions
1169
1170    def get_reward_distribution(
1171        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
1172    ) -> rv_continuous:
1173        """
1174        Returns
1175        -------
1176        rv_continuous
1177            The reward distribution for transitioning in next_node when selecting action from state.
1178        """
1179        if (node, action, next_node) not in self._cached_reward_distributions:
1180            self._cached_reward_distributions[
1181                node, action, next_node
1182            ] = self._get_reward_distribution(
1183                node, self._inverse_action_mapping(node, action), next_node
1184            )
1185        return self._cached_reward_distributions[node, action, next_node]
1186
1187    def sample_reward(
1188        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
1189    ) -> float:
1190        """
1191        Returns
1192        -------
1193        float
1194            A sample from the reward distribution when transitioning in next_node when selecting action from state.
1195        """
1196        if (node, action, next_node) not in self._cached_rewards or len(
1197            self._cached_rewards[node, action, next_node]
1198        ) == 0:
1199            self._cached_rewards[node, action, next_node] = (
1200                self.get_reward_distribution(node, action, next_node)
1201                .rvs(5000, random_state=self._rng)
1202                .tolist()
1203            )
1204        r = self._cached_rewards[node, action, next_node].pop(0)
1205        return (
1206            r * (self.rewards_range[1] - self.rewards_range[0]) - self.rewards_range[0]
1207        )
1208
1209    def get_measure_from_name(self, measure_name: str) -> float:
1210        """
1211        Parameters
1212        ----------
1213        measure_name : str
1214            the code name for the measure. The available code names can be obtained using the
1215            ``BaseMDP.get_available_hardness_measures`` function.
1216        Returns
1217        -------
1218        float
1219            The value for the hardness measure.
1220        """
1221        if measure_name == "diameter":
1222            return self.diameter
1223        elif measure_name in ["value_norm", "environmental_value_norm"]:
1224            return self.value_norm
1225        elif measure_name == "suboptimal_gaps":
1226            return self.sum_reciprocals_suboptimality_gaps
1227        else:
1228            raise ValueError(
1229                f"{measure_name} is not a valid hardness measure name: available ones are "
1230                + str(self.get_available_hardness_measures())
1231            )
1232
1233    def action_spec(self) -> DiscreteArray:
1234        """
1235        Returns
1236        -------
1237        DiscreteArray
1238            The action spec of the environment.
1239        """
1240        return DiscreteArray(self.n_actions, name="action")
1241
1242    def observation_spec(self) -> Array:
1243        """
1244        Returns
1245        -------
1246        Array
1247            The observation spec of the environment.
1248        """
1249        if self.emission_map is None:
1250            return DiscreteArray(self.n_states, name="observation")
1251        obs = self.get_observation(self.starting_nodes[0], 0)
1252        return BoundedArray(obs.shape, obs.dtype, -np.inf, np.inf, "observation")
1253
1254    def get_observation(
1255        self, node: "NODE_TYPE", h: int = None
1256    ) -> Union[int, np.ndarray]:
1257        """
1258        Returns
1259        -------
1260        Union[int, np.ndarray]
1261            The representation corresponding to the state given in input. Episodic MDPs also requires the current
1262        in-episode time step.
1263        """
1264        if self.emission_map is None:
1265            return self.node_to_index[self.cur_node]
1266        return self.emission_map.get_observation(node, h)
1267
1268    def reset(self) -> dm_env.TimeStep:
1269        """
1270        resets the environment to a newly sampled starting state.
1271        """
1272        self.necessary_reset = False
1273        self.h = 0
1274        self.cur_node = self.last_starting_node = self._starting_node_sampler.sample()
1275        node_info_class = self.get_info_class(self.cur_node)
1276        node_info_class.update_visitation_counts()
1277        return dm_env.restart(self.get_observation(self.cur_node, self.h))
1278
1279    def step(self, action: int, auto_reset=False) -> dm_env.TimeStep:
1280        """
1281        takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the
1282        at the end of the episodes.
1283        Returns
1284        -------
1285        dm_env.TimeStep
1286            The TimeStep object containing the transition information.
1287        """
1288        if auto_reset and self.necessary_reset:
1289            return self.reset()
1290        assert not self.necessary_reset
1291
1292        # This can be the in episode time step (episodic setting) or the total numuber of time steps (continuous setting)
1293        self.h += 1
1294
1295        # In case the action is a numpy array
1296        action = int(action)
1297
1298        # Moving the current state according to the action played
1299        old_node = self.cur_node
1300        self.cur_node = self.get_info_class(old_node).sample_next_state(action)
1301        self.last_edge = old_node, self.cur_node
1302        node_info_class = self.get_info_class(self.cur_node)
1303        node_info_class.update_visitation_counts(action)
1304
1305        # Calculate reward and observation
1306        reward = self.sample_reward(old_node, action, self.cur_node)
1307        observation = self.get_observation(self.cur_node, self.h)
1308
1309        # Wrapping the time step in a dm_env.TimeStep
1310        if self.is_episodic() and self.h >= self.H:
1311            self.necessary_reset = True
1312            if self.emission_map is None:
1313                observation = -1
1314            else:
1315                observation = np.zeros_like(self.observation_spec().generate_value())
1316            return dm_env.termination(reward=reward, observation=observation)
1317        return dm_env.transition(reward=reward, observation=observation)
1318
1319    def random_steps(
1320        self, n: int, auto_reset=False
1321    ) -> List[Tuple[dm_env.TimeStep, int]]:
1322        """
1323        takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to
1324        True than it automatically resets episodic MDPs.
1325
1326        Returns
1327        -------
1328        dm_env.TimeStep
1329            The TimeStep object containing the transition information.
1330        int
1331            The action performed.
1332        """
1333
1334        data = []
1335        for _ in range(n):
1336            action = int(self._rng.randint(self.action_spec().num_values))
1337            ts = self.step(action, auto_reset)
1338            data.append((ts, action))
1339        return data
1340
1341    def random_step(self, auto_reset=False) -> Tuple[dm_env.TimeStep, int]:
1342        """
1343        takes a step with a random action and returns both the next step and the random action. If auto_reset is set to
1344        True than it automatically resets episodic MDPs.
1345
1346        Returns
1347        -------
1348        dm_env.TimeStep
1349            The TimeStep object containing the transition information.
1350        int
1351            The action performed.
1352        """
1353        action = int(self._rng.randint(self.action_spec().num_values))
1354        ts = self.step(action, auto_reset)
1355        return ts, action
1356
1357    def get_visitation_counts(
1358        self, state_only=True
1359    ) -> Dict[Union["NODE_TYPE", Tuple["NODE_TYPE", "ACTION_TYPE"]], int]:
1360        """
1361        when state_only is True it returns the visitation counts for the states and when it is False it returns the
1362        visitation counts for state action pairs.
1363        """
1364        if state_only:
1365            return {
1366                node: self.get_info_class(node).state_visitation_count
1367                for node in self.G.nodes
1368            }
1369        return {
1370            (node, a): self.get_info_class(node).actions_visitation_count[a]
1371            for node in self.G.nodes
1372            for a in range(self.n_actions)
1373        }
1374
1375    def reset_visitation_counts(self):
1376        """
1377        resets the visitation counts to zero for all states and state action pairs.
1378        """
1379        for node in self.G.nodes:
1380            self.get_info_class(node).state_visitation_count = 0
1381            for a in range(self.n_actions):
1382                self.get_info_class(node).actions_visitation_count[a] = 0
1383
1384    def get_value_node_labels(
1385        self: Union["ContinuousMDP", "EpisodicMDP"], V: np.ndarray = None
1386    ) -> Dict["NODE_TYPE", float]:
1387        """
1388        returns a mapping from state to state values. By default, it uses the optimal values.
1389        """
1390        if V is None:
1391            _, V = self.optimal_value_functions
1392        else:
1393            if isinstance(self, EpisodicMDP):
1394                h, d = V.shape
1395                assert h == self.H and d == self.n_states
1396            else:
1397                assert len(V) == self.n_states
1398        return {
1399            node: np.round(
1400                V[0, self.node_to_index[node]]
1401                if self.is_episodic()
1402                else V[self.node_to_index[node]],
1403                2,
1404            )
1405            for node in self.G.nodes
1406        }
class BaseMDP(dm_env._environment.Environment, abc.ABC):
  46class BaseMDP(dm_env.Environment, abc.ABC):
  47    """
  48    The base class for MDPs.
  49    """
  50
  51    @staticmethod
  52    @abc.abstractmethod
  53    def get_unique_symbols() -> List[str]:
  54        """
  55        Returns
  56        -------
  57        List[str]
  58            the unique symbols of the grid representation of the MDP.
  59        """
  60
  61    @staticmethod
  62    def get_available_hardness_measures() -> List[str]:
  63        """
  64        Returns
  65        -------
  66        List[str]
  67            The hardness measures available in the package.
  68        """
  69        return ["diameter", "value_norm", "suboptimal_gaps"]
  70
  71    @staticmethod
  72    def produce_gin_file_from_mdp_parameters(
  73        parameters: Dict[str, Any], mdp_class_name: str, index: int = 0
  74    ) -> str:
  75        """
  76        Parameters
  77        ----------
  78        parameters : Dict[str, Any]
  79            The parameters of the MDP.
  80        mdp_class_name : str
  81            The name of the class of the MDP.
  82        index : int
  83            The index for the gin configuration
  84
  85        Returns
  86        -------
  87        str
  88            The gin configuration for the given parameters and index.
  89        """
  90
  91        string = ""
  92        for k, v in parameters.items():
  93            string += f"prms_{index}/{mdp_class_name}.{k} = {v}\n"
  94        return string
  95
  96    @staticmethod
  97    @abc.abstractmethod
  98    def does_seed_change_MDP_structure() -> bool:
  99        """
 100        Returns
 101        -------
 102        bool
 103            True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
 104        happen when there are fewer starting states that possible one and the effective starting states are picked
 105        randomly based on the seed.
 106        """
 107
 108    @staticmethod
 109    @abc.abstractmethod
 110    def is_episodic() -> bool:
 111        """
 112        Returns
 113        -------
 114        bool
 115            True if the MDP is episodic.
 116        """
 117
 118    @staticmethod
 119    @abc.abstractmethod
 120    def sample_parameters(n: int, seed: int = None) -> List[Dict[str, Any]]:
 121        """
 122        Returns
 123        -------
 124        List[Dict[str, Any]]
 125            n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
 126        """
 127
 128    @staticmethod
 129    @abc.abstractmethod
 130    def sample_mdp_parameters(
 131        n: int, is_episodic: bool, seed: int = None
 132    ) -> List[Dict[str, Any]]:
 133        """
 134        Returns
 135        -------
 136        List[Dict[str, Any]]
 137            n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
 138        """
 139
 140    @staticmethod
 141    @abc.abstractmethod
 142    def get_node_class() -> Type["NODE_TYPE"]:
 143        """
 144        Returns
 145        -------
 146        Type["NODE_TYPE"]
 147            The class of the nodes of the MDP.
 148        """
 149
 150    @property
 151    @abc.abstractmethod
 152    def n_actions(self) -> int:
 153        """
 154        Returns
 155        -------
 156        int
 157            The number of available actions.
 158        """
 159
 160    @abc.abstractmethod
 161    def _get_next_nodes_parameters(
 162        self, node: "NODE_TYPE", action: "ACTION_TYPE"
 163    ) -> Tuple[Tuple[dict, float], ...]:
 164        """
 165        Returns
 166        -------
 167        Tuple[Tuple[dict, float], ...]
 168            The parameters of the possible next nodes reachable from the given state and action.
 169        """
 170
 171    @abc.abstractmethod
 172    def _get_reward_distribution(
 173        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
 174    ) -> rv_continuous:
 175        """
 176        Returns
 177        -------
 178        rv_continuous
 179            The distribution over rewards in the zero one interval when transitioning from a given state and action
 180        to a given next state. Note that rescaling the rewards to a different range is handled separately.
 181        """
 182
 183    @abc.abstractmethod
 184    def _get_starting_node_sampler(self) -> NextStateSampler:
 185        """
 186        Returns
 187        -------
 188        NextStateSampler
 189            The sampler over next states that corresponds to the starting state distribution.
 190        """
 191
 192    @abc.abstractmethod
 193    def _check_parameters_in_input(self):
 194        """
 195        checks whether the parameters given in input can produce a correct MDP instance.
 196        """
 197        assert self._p_rand is None or (0 < self._p_rand < 0.9999)
 198        assert self._p_lazy is None or (0 < self._p_lazy < 0.9999)
 199
 200    @abc.abstractmethod
 201    def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.ndarray:
 202        """
 203        Returns
 204        -------
 205        np.ndarray
 206            An ASCII representation of the state given in input stored as numpy array.
 207        """
 208
 209    @abc.abstractmethod
 210    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
 211        """
 212        Returns
 213        -------
 214        np.ndarray
 215            An ASCII representation of the state given in input stored as numpy array.
 216        """
 217
 218    @property
 219    @abc.abstractmethod
 220    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
 221        """
 222        Returns
 223        -------
 224        List["NODE_TYPE"]
 225            The list containing all the possible starting state for this MDP instance.
 226        """
 227
 228    @property
 229    @abc.abstractmethod
 230    def parameters(self) -> Dict[str, Any]:
 231        """
 232        Returns
 233        -------
 234        Dict[str, Any]
 235            The parameters of the MDP.
 236        """
 237        return dict(
 238            seed=self._seed,
 239            randomize_actions=self._randomize_actions,
 240            p_lazy=self._p_lazy,
 241            p_rand=self._p_rand,
 242            rewards_range=self._rewards_range,
 243            make_reward_stochastic=self._make_reward_stochastic,
 244            reward_variance_multiplier=self._reward_variance_multiplier,
 245        )
 246
 247    @abc.abstractmethod
 248    def get_gin_parameters(self, index: int) -> str:
 249        """
 250        Returns
 251        -------
 252        str
 253            The gin config of the MDP instance.
 254        """
 255
 256    def get_gin_config(self, index: int) -> str:
 257        """
 258        Returns
 259        -------
 260        str
 261            The gin config file of the MDP.
 262        """
 263        return "".join(self.get_gin_parameters(index))
 264
 265    def get_node_labels(self, l: List[Any]) -> Dict["NODE_TYPE", Any]:
 266        """
 267        Returns
 268        -------
 269        Dict["NODE_TYPE", Any]
 270            A dictionary that assigns to each node the value of the list in the corresponding index.
 271        """
 272        assert len(l) == self.n_states, (
 273            f"The array in input should have the same dimensionality of the number of states {self.n_states}, "
 274            f" {len(l)} long array received instead."
 275        )
 276        return {self.index_to_node[i]: l[i] for i in range(len(l))}
 277
 278    def get_node_action_labels(self, l: List[List[Any]]) -> Dict["NODE_TYPE", float]:
 279        """
 280        Returns
 281        -------
 282        Dict["NODE_TYPE", Any]
 283            A dictionary that assigns to each node and action the value of the list in the corresponding index.
 284        """
 285        assert len(l) == self.n_states, (
 286            f"The list in input should be the same length of the number of states {self.n_states}, "
 287            f" list of length {len(l)} received instead."
 288        )
 289        assert all(
 290            len(ll) == self.n_actions for ll in l
 291        ), f"The lists inside the lists should be the same length as the number of actions {self.n_actions}, "
 292        return {
 293            (self.index_to_node[i], action): l[i][action]
 294            for i in range(len(l))
 295            for action in range(self.n_actions)
 296        }
 297
 298    @property
 299    def hash(self) -> str:
 300        """
 301        Returns
 302        -------
 303        str
 304            The hash value based on the parameters of the MDP. This can be used to create cache files.
 305        """
 306        s = "_".join(map(str, clean_for_storing(list(self.parameters.values()))))
 307        return f"mdp_{type(self).__name__}_" + clean_for_file_path(s)
 308
 309    def __str__(self) -> str:
 310        """
 311        Returns
 312        -------
 313        str
 314            The string containing all the information about the MDP including parameters, measures of hardness and
 315        graph metrics.
 316        """
 317        string = type(self).__name__ + "\n"
 318        m_l = 0
 319        for k, v in self.summary.items():
 320            m_l = max(m_l, len(max(v.keys(), key=len)) + 4)
 321        for k, v in self.summary.items():
 322            string += "\t" + k + "\n"
 323            for kk, vv in v.items():
 324                string += f"\t\t{kk}{' ' * (m_l - len(kk))}:\t{vv}\n"
 325        return string
 326
 327    @abc.abstractmethod
 328    def __init__(
 329        self: Union["EpisodicMDP", "ContinuousMDP"],
 330        seed: int,
 331        randomize_actions: bool = True,
 332        make_reward_stochastic=False,
 333        reward_variance_multiplier: float = 1.0,
 334        p_lazy: float = None,
 335        p_rand: float = None,
 336        rewards_range: Tuple[float, float] = (0.0, 1.0),
 337        emission_map: Type[EmissionMap] = Tabular,
 338        emission_map_kwargs: Dict[str, Any] = dict(),
 339        noise: Type[EmissionMap] = None,
 340        noise_kwargs: Dict[str, Any] = dict(),
 341        instantiate_mdp: bool = True,
 342        force_sparse_transition: bool = False,
 343        exclude_horizon_from_parameters: bool = False,
 344    ):
 345        """
 346        Parameters
 347        ----------
 348        seed : int
 349            The random seed.
 350        randomize_actions : bool
 351            If True, the outcome of action at different states is randomized. By default, it is set to True.
 352        make_reward_stochastic : bool
 353            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
 354        reward_variance_multiplier : float
 355            A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the
 356             variance. By default, it is set to 1.
 357        p_lazy : float
 358            The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero.
 359        p_rand : float
 360            The probability of selecting an action at random instead of the one specified by the agent. By default, it
 361            is set to zero.
 362        rewards_range : Tuple[float, float]
 363            The maximum range of values for the rewards. By default, it is `(0, 1)`.
 364        emission_map : Type[EmissionMap]
 365            The emission map that renders the MPD non-tabular. By default, no emission map is used.
 366        emission_map_kwargs : Dict[str, Any]
 367            The keyword arguments for the emission map.
 368        noise : Type[EmissionMap]
 369            The noise class to make the emission map stochastic. By default, the emission map is left deterministic.
 370        noise_kwargs : Dict[str, Any]
 371            The keyword arguments for the noise.
 372        instantiate_mdp : bool
 373            If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True.
 374        force_sparse_transition : bool
 375            If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not
 376            enforced.
 377        exclude_horizon_from_parameters : bool
 378            If True, in the episodic case, the horizon is not considered a parameter when computing the `parameters`
 379            properties and the hash. By default, it is not excluded.
 380        """
 381
 382        # MDP generic variables
 383        self._seed = seed
 384        self._randomize_actions = randomize_actions
 385        self._make_reward_stochastic = make_reward_stochastic
 386        self._reward_variance_multiplier = reward_variance_multiplier
 387        self._hardness_reports_folder = config.get_hardness_measures_cache_folder()
 388        self._force_sparse_transition = force_sparse_transition
 389        self._exclude_horizon_from_parameters = exclude_horizon_from_parameters
 390        self._p_rand = p_rand if p_rand is None or p_rand > 0.0 else None
 391        self._p_lazy = p_lazy if p_lazy is None or p_lazy > 0.0 else None
 392        self.rewards_range = self._rewards_range = (
 393            rewards_range
 394            if rewards_range[0] < rewards_range[1]
 395            else rewards_range[::-1]
 396        )
 397        self._are_all_rewards_deterministic = True
 398        self._are_all_transition_deterministic = True
 399        self.r_min, self.r_max = self.rewards_range
 400
 401        # MDP loop variables
 402        self._hr = None
 403        self.cur_node = None
 404        self.last_edge = None
 405        self.last_starting_node = None
 406        self.is_reset_necessary = True
 407        self.current_timestep = 0
 408        self.h = 0
 409        self._rng = np.random.RandomState(seed)
 410        self._fast_rng = random.Random(seed)
 411
 412        # Caching variables. Note that we cannot use lru_cache because it prevents from deleting the objects.
 413        self._cached_rewards = dict()
 414        self._cached_reward_distributions = dict()
 415        self._action_mapping = dict()
 416        self._communication_class = None
 417        self._recurrent_nodes_set = None
 418        if not hasattr(self, "_transition_matrix_and_rewards"):
 419            self._transition_matrix_and_rewards = None
 420        self._graph_layout = None
 421        self._graph_metrics = None
 422        self._summary = None
 423        self._diameter = None
 424        self._sum_reciprocals_suboptimality_gaps = None
 425        self._optimal_value_norm = dict()
 426        self._optimal_value = None
 427        self._worst_value = None
 428        self._random_value = None
 429        self._optimal_policy = dict()
 430        self._worst_policy = dict()
 431        self._otp = None
 432        self._omc = None
 433        self._osd = None
 434        self._oars = None
 435        self._oar = None
 436        self._wtp = None
 437        self._wmc = None
 438        self._wsd = None
 439        self._wars = None
 440        self._war = None
 441        self._rtp = None
 442        self._rmc = None
 443        self._rsd = None
 444        self._rars = None
 445        self._rar = None
 446
 447        if instantiate_mdp:
 448            self.instantiate_MDP()
 449
 450        # Emission map
 451        if emission_map == Tabular or emission_map is None:
 452            self.emission_map = None
 453            self.is_tabular = True
 454        else:
 455            noise_kwargs["seed"] = seed
 456            self.emission_map = emission_map(
 457                self,
 458                noise_class=noise,
 459                noise_kwargs=noise_kwargs,
 460                **emission_map_kwargs,
 461            )
 462            self.is_tabular = self.emission_map.is_tabular
 463
 464    def instantiate_MDP(self):
 465        """
 466        recurisively instantiate the MDP.
 467        """
 468        # Instantiating the MDP
 469        self._check_parameters_in_input()
 470        self._starting_node_sampler = self._get_starting_node_sampler()
 471        self.starting_nodes = self._starting_node_sampler.next_nodes
 472        self.G = nx.DiGraph()
 473        self._instantiate_mdp()
 474        self.n_states = len(self.G.nodes)
 475
 476        # Some shortcuts variables
 477        if not self.is_episodic():
 478            self._vi, self._pe = (
 479                discounted_value_iteration,
 480                discounted_policy_evaluation,
 481            )
 482            self.random_policy = (
 483                np.ones((self.n_states, self.n_actions), dtype=np.float32)
 484                / self.n_actions
 485            )
 486
 487        # Cache state to index mapping
 488        mapping = self._rng.rand(self.n_states, self.n_actions).argsort(1)
 489        self.node_to_index = dict()
 490        self.index_to_node = dict()
 491        for i, node in enumerate(self.G.nodes):
 492            self.node_to_index[node] = i
 493            self.index_to_node[i] = node
 494
 495        # Compute the starting state distribution
 496        self.starting_state_distribution = np.zeros(self.n_states)
 497        self.starting_states = []
 498        for n, p in self._starting_node_sampler.next_nodes_and_probs:
 499            s = self.node_to_index[n]
 500            self.starting_state_distribution[s] = p
 501            self.starting_states.append(s)
 502        self.starting_states_and_probs = list(
 503            zip(self.starting_states, self._starting_node_sampler.probs)
 504        )
 505
 506    def _get_action_mapping(self, node: "NODE_TYPE") -> Tuple["ACTION_TYPE", ...]:
 507        """
 508        Returns
 509        -------
 510        Tuple["ACTION_TYPE", ...]
 511            The random action mapping for the state given in input.
 512        """
 513        if node not in self._action_mapping:
 514            self._action_mapping[node] = (
 515                self._rng.rand(self.n_actions).argsort().tolist()
 516                if self._randomize_actions
 517                else list(range(self.n_actions))
 518            )
 519        return self._action_mapping[node]
 520
 521    def _inverse_action_mapping(
 522        self, node: "NODE_TYPE", action: "ACTION_TYPE"
 523    ) -> "ACTION_TYPE":
 524        """
 525        Returns
 526        -------
 527        ACTION_TYPE
 528            The effective action corresponding to the action given in input in the given state. In other words, it
 529        reverses the random action mapping.
 530        """
 531        return self._get_action_mapping(node)[action]
 532
 533    def _produce_random_seed(self) -> int:
 534        """
 535        Returns
 536        -------
 537        int
 538            A new random seed that can be used for internal objects.
 539        """
 540        return self._fast_rng.randint(0, 10_000)
 541
 542    def _instantiate_mdp(self):
 543        """
 544        recursively instantiate the MDP.
 545        """
 546        for sn in self.starting_nodes:
 547            instantiate_transitions(self, sn)
 548
 549    @property
 550    def T(self) -> np.ndarray:
 551        """
 552        is an alias for the transition matrix.
 553        """
 554        return self.transition_matrix_and_rewards[0]
 555
 556    @property
 557    def R(self) -> np.ndarray:
 558        """
 559        is an alias for the rewards matrix.
 560        """
 561        return self.transition_matrix_and_rewards[1]
 562
 563    @property
 564    def recurrent_nodes_set(self) -> Iterable["NODE_TYPE"]:
 565        """
 566        Returns
 567        -------
 568        Iterable["NODE_TYPE"]
 569            The recurrent states set.
 570        """
 571        if self._recurrent_nodes_set is None:
 572            self._recurrent_nodes_set = get_recurrent_nodes_set(
 573                self.communication_class, self.G
 574            )
 575        return self._recurrent_nodes_set
 576
 577    @property
 578    def communication_class(
 579        self: Union["EpisodicMDP", "ContinuousMDP"]
 580    ) -> MDPCommunicationClass:
 581        """
 582        Returns
 583        -------
 584        MDPCommunicationClass
 585            The communication class.
 586        """
 587        if self._communication_class is None:
 588            self._communication_class = get_communication_class(
 589                self.T, self.get_episodic_graph(True) if self.is_episodic() else self.G
 590            )
 591        return self._communication_class
 592
 593    def get_optimal_policy(self, stochastic_form: bool) -> np.ndarray:
 594        """
 595        Returns
 596        -------
 597        np.ndarray
 598            The optimal policy. It can be either in the stochastic form, so in the form of dirac probability
 599        distribution or as a simple mapping to integer.
 600        """
 601        if stochastic_form not in self._optimal_policy:
 602            self._optimal_policy[stochastic_form] = get_policy_from_q_values(
 603                self.optimal_value_functions[0], stochastic_form
 604            )
 605        return self._optimal_policy[stochastic_form]
 606
 607    def get_worst_policy(self, stochastic_form) -> np.ndarray:
 608        """
 609        Returns
 610        -------
 611        np.ndarray
 612            The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability
 613        distribution or as a simple mapping to integer.
 614        """
 615        if stochastic_form not in self._worst_policy:
 616            self._worst_policy[stochastic_form] = get_policy_from_q_values(
 617                self._vi(self.T, -self.R)[0], stochastic_form
 618            )
 619        return self._worst_policy[stochastic_form]
 620
 621    def get_value_functions(self, policy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
 622        """
 623        Parameters
 624        ----------
 625        policy : np.ndarray
 626            a numpy array containing a continuous policy.
 627        Returns
 628        -------
 629        Q : np.ndarray
 630            the state action value function.
 631        V : np.ndarray
 632            the state value function.
 633        """
 634        return self._pe(*self.transition_matrix_and_rewards, policy)
 635
 636    @property
 637    def optimal_value_functions(self) -> Tuple[np.ndarray, np.ndarray]:
 638        """
 639        Returns
 640        -------
 641        Q : np.ndarray
 642            the optimal state action value function.
 643        V : np.ndarray
 644            the optimal state value function.
 645        """
 646        if self._optimal_value is None:
 647            self._optimal_value = self._vi(*self.transition_matrix_and_rewards)
 648        return self._optimal_value
 649
 650    @property
 651    def worst_value_functions(self) -> Tuple[np.ndarray, np.ndarray]:
 652        """
 653        Returns
 654        -------
 655        Q : np.ndarray
 656            the state action value function of the worst policy.
 657        V : np.ndarray
 658            the state value function of the worst policy.
 659        """
 660        if self._worst_value is None:
 661            self._worst_value = self._pe(
 662                *self.transition_matrix_and_rewards, self.get_worst_policy(True)
 663            )
 664        return self._worst_value
 665
 666    @property
 667    def random_value_functions(self) -> Tuple[np.ndarray, np.ndarray]:
 668        """
 669        Returns
 670        -------
 671        Q : np.ndarray
 672            the state action value function of the random uniform policy.
 673        V : np.ndarray
 674            the state value function of the random uniform policy.
 675        """
 676        if self._random_value is None:
 677            self._random_value = self._pe(
 678                *self.transition_matrix_and_rewards, self.random_policy
 679            )
 680        return self._random_value
 681
 682    @property
 683    def optimal_transition_probabilities(self) -> np.ndarray:
 684        """
 685        Returns
 686        -------
 687        np.ndarray
 688            The transition probabilities corresponding to the optimal policy.
 689        """
 690        if self._otp is None:
 691            T = self.T_cf if self.is_episodic() else self.T
 692            pi = (
 693                self.get_optimal_policy_continuous_form(True)
 694                if self.is_episodic()
 695                else self.get_optimal_policy(True)
 696            )
 697            self._otp = get_transition_probabilities(T, pi)
 698        return self._otp
 699
 700    @property
 701    def worst_transition_probabilities(self) -> np.ndarray:
 702        """
 703        Returns
 704        -------
 705        np.ndarray
 706            The transition probabilities corresponding to the worst policy.
 707        """
 708        if self._wtp is None:
 709            T = self.T_cf if self.is_episodic() else self.T
 710            pi = (
 711                self.get_worst_policy_continuous_form(True)
 712                if self.is_episodic()
 713                else self.get_worst_policy(True)
 714            )
 715            self._wtp = get_transition_probabilities(T, pi)
 716        return self._wtp
 717
 718    @property
 719    def random_transition_probabilities(self) -> np.ndarray:
 720        """
 721        Returns
 722        -------
 723        np.ndarray
 724            The transition probabilities corresponding to the random uniform policy.
 725        """
 726        if self._rtp is None:
 727            T = self.T_cf if self.is_episodic() else self.T
 728            pi = self.random_policy_cf if self.is_episodic() else self.random_policy
 729            self._rtp = get_transition_probabilities(T, pi)
 730        return self._rtp
 731
 732    @property
 733    def optimal_markov_chain(self) -> MarkovChain:
 734        """
 735        Returns
 736        -------
 737        MarkovChain
 738            The Markov chain corresponding to the optimal policy.
 739        """
 740        if self._omc is None:
 741            self._omc = get_markov_chain(self.optimal_transition_probabilities)
 742        return self._omc
 743
 744    @property
 745    def worst_markov_chain(self) -> MarkovChain:
 746        """
 747        Returns
 748        -------
 749        MarkovChain
 750            The Markov chain corresponding to the worst policy.
 751        """
 752        if self._wmc is None:
 753            self._wmc = get_markov_chain(self.worst_transition_probabilities)
 754        return self._wmc
 755
 756    @property
 757    def random_markov_chain(self) -> MarkovChain:
 758        """
 759        Returns
 760        -------
 761        MarkovChain
 762            The Markov chain corresponding to the random uniform policy.
 763        """
 764        if self._rmc is None:
 765            self._rmc = get_markov_chain(self.random_transition_probabilities)
 766        return self._rmc
 767
 768    def get_stationary_distribution(self, policy: np.ndarray) -> np.ndarray:
 769        """
 770        Parameters
 771        ----------
 772        policy : np.ndarray
 773            a numpy array containing a continuous policy.
 774        Returns
 775        -------
 776        np.ndarray
 777            the stationary distribution of the Markov chain yielded by the policy.
 778        """
 779        return get_stationary_distribution(
 780            get_transition_probabilities(self.T, policy),
 781            self.starting_states_and_probs,
 782        )
 783
 784    @property
 785    def optimal_stationary_distribution(self) -> np.array:
 786        """
 787        Returns
 788        -------
 789        np.ndarray
 790            the stationary distribution of the Markov chain yielded by the optimal policy.
 791        """
 792        if self._osd is None:
 793            self._osd = get_stationary_distribution(
 794                self.optimal_transition_probabilities,
 795                self.starting_states_and_probs,
 796            )
 797        return self._osd
 798
 799    @property
 800    def worst_stationary_distribution(self) -> np.array:
 801        """
 802        Returns
 803        -------
 804        np.ndarray
 805            the stationary distribution of the Markov chain yielded by the worst policy.
 806        """
 807        if self._wsd is None:
 808            self._wsd = get_stationary_distribution(
 809                self.worst_transition_probabilities,
 810                self.starting_states_and_probs,
 811            )
 812            assert not np.isnan(self._wsd).any()
 813        return self._wsd
 814
 815    @property
 816    def random_stationary_distribution(self) -> np.array:
 817        """
 818        Returns
 819        -------
 820        np.ndarray
 821            the stationary distribution of the Markov chain yielded by the random uniform policy.
 822        """
 823        if self._rsd is None:
 824            self._rsd = get_stationary_distribution(
 825                self.random_transition_probabilities,
 826                None,
 827            )
 828        return self._rsd
 829
 830    @property
 831    def optimal_average_rewards(self) -> np.ndarray:
 832        """
 833        Returns
 834        -------
 835        np.ndarray
 836            The average rewards obtained for each state when following the optimal policy.
 837        """
 838        if self._oars is None:
 839            R = self.R_cf if self.is_episodic() else self.R
 840            pi = (
 841                self.get_optimal_policy_continuous_form(True)
 842                if self.is_episodic()
 843                else self.get_optimal_policy(True)
 844            )
 845            self._oars = get_average_rewards(R, pi)
 846        return self._oars
 847
 848    @property
 849    def worst_average_rewards(self) -> np.ndarray:
 850        """
 851        Returns
 852        -------
 853        np.ndarray
 854            The average rewards obtained for each state when following the worst policy.
 855        """
 856        if self._wars is None:
 857            R = self.R_cf if self.is_episodic() else self.R
 858            pi = (
 859                self.get_worst_policy_continuous_form(True)
 860                if self.is_episodic()
 861                else self.get_worst_policy(True)
 862            )
 863            self._wars = get_average_rewards(R, pi)
 864        return self._wars
 865
 866    @property
 867    def random_average_rewards(self) -> np.ndarray:
 868        """
 869        Returns
 870        -------
 871        np.ndarray
 872            The average rewards obtained for each state when following the random uniform policy.
 873        """
 874        if self._rars is None:
 875            R = self.R_cf if self.is_episodic() else self.R
 876            pi = self.random_policy_cf if self.is_episodic() else self.random_policy
 877            self._rars = get_average_rewards(R, pi)
 878        return self._rars
 879
 880    def get_average_reward(self, policy: np.ndarray) -> float:
 881        r"""
 882        Parameters
 883        ----------
 884        policy : np.ndarray
 885            a numpy array containing a continuous policy.
 886        Returns
 887        -------
 888        average_reward : float
 889            the expected time average reward, i.e.
 890            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 891             the policy.
 892        """
 893        sd = self.get_stationary_distribution(policy)
 894        return sum(sd * get_average_rewards(self.R, policy))
 895
 896    @property
 897    def optimal_average_reward(self) -> float:
 898        r"""
 899        Returns
 900        -------
 901        average_reward : float
 902            the expected time average reward, i.e.
 903            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 904            the optimal policy.
 905        """
 906        if self._oar is None:
 907            self._oar = sum(
 908                self.optimal_stationary_distribution * self.optimal_average_rewards
 909            )
 910        return self._oar
 911
 912    @property
 913    def worst_average_reward(self) -> float:
 914        r"""
 915        Returns
 916        -------
 917        average_reward : float
 918            the expected time average reward, i.e.
 919            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 920            the worst policy.
 921        """
 922        if self._war is None:
 923            self._war = sum(
 924                self.worst_stationary_distribution * self.worst_average_rewards
 925            )
 926        return self._war
 927
 928    @property
 929    def random_average_reward(self) -> float:
 930        r"""
 931        Returns
 932        -------
 933        average_reward : float
 934            the expected time average reward, i.e.
 935            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
 936            the random uniform policy.
 937        """
 938        if self._rar is None:
 939            self._rar = sum(
 940                self.random_stationary_distribution * self.random_average_rewards
 941            )
 942        return self._rar
 943
 944    @property
 945    def transition_matrix_and_rewards(self) -> Tuple[np.ndarray, np.ndarray]:
 946        """
 947        Returns
 948        -------
 949        np.ndarray
 950            The transition probabilities 3d numpy array.
 951        np.ndarray
 952            The reward matrix.
 953        """
 954        if self._transition_matrix_and_rewards is None:
 955            self._transition_matrix_and_rewards = get_transition_matrix_and_rewards(
 956                self.n_states,
 957                self.n_actions,
 958                self.G,
 959                self.get_info_class,
 960                self.get_reward_distribution,
 961                self.node_to_index,
 962                self._force_sparse_transition,
 963            )
 964        return self._transition_matrix_and_rewards
 965
 966    @property
 967    def graph_layout(self) -> Dict["NODE_TYPE", Tuple[float, float]]:
 968        """
 969        Returns
 970        -------
 971        Dict["NODE_TYPE", Tuple[float, float]]
 972            The graph layout for the MDP. It can be customized by implementing the `custom_graph_layout` function.
 973        """
 974        if self._graph_layout is None:
 975            self._graph_layout = (
 976                self.custom_graph_layout()
 977                if hasattr(self, "custom_graph_layout")
 978                else nx.nx_agraph.graphviz_layout(self.G)
 979            )
 980        return self._graph_layout
 981
 982    @property
 983    def graph_metrics(self) -> Dict[str, Any]:
 984        """
 985        Returns
 986        -------
 987        Dict[str, Any]
 988            The dictionary containing graph metrics for the graph underlying the MDP.
 989        """
 990        if self._graph_metrics is None:
 991            G = self.get_episodic_graph(True) if self.is_episodic() else self.G
 992            self._graph_metrics = dict()
 993            self._graph_metrics["# nodes"] = len(G.nodes)
 994            self._graph_metrics["# edges"] = len(G.edges)
 995        return self._graph_metrics
 996
 997    @property
 998    def diameter(self: Union["EpisodicMDP", "ContinuousMDP"]) -> float:
 999        """
1000        Returns
1001        -------
1002        float
1003            The diameter of the MDP.
1004        """
1005        if self._diameter is None:
1006            if self.hardness_report:
1007                self._diameter = self.hardness_report["MDP measure of hardness"][
1008                    "diameter"
1009                ]
1010            else:
1011                self._diameter = get_diameter(
1012                    self.episodic_transition_matrix_and_rewards[0]
1013                    if self.is_episodic()
1014                    else self.T,
1015                    self.is_episodic(),
1016                )
1017        return self._diameter
1018
1019    @property
1020    def sum_reciprocals_suboptimality_gaps(
1021        self: Union["EpisodicMDP", "ContinuousMDP"]
1022    ) -> float:
1023        """
1024        Returns
1025        -------
1026        float
1027            The sum of the reciprocals of the sub-optimality gaps
1028        """
1029        if self._sum_reciprocals_suboptimality_gaps is None:
1030            if self.hardness_report:
1031                self._sum_reciprocals_suboptimality_gaps = self.hardness_report[
1032                    "MDP measure of hardness"
1033                ]["suboptimal_gaps"]
1034            else:
1035                self._sum_reciprocals_suboptimality_gaps = (
1036                    get_sum_reciprocals_suboptimality_gaps(
1037                        *self.optimal_value_functions,
1038                        self.reachable_states if self.is_episodic() else None,
1039                    )
1040                )
1041        return self._sum_reciprocals_suboptimality_gaps
1042
1043    def _compute_value_norm(self, discounted: bool) -> float:
1044        """
1045        Returns
1046        -------
1047        float
1048            The environmental value norm in the undiscounted or undiscounted setting.
1049        """
1050        T, R = (self.T_cf, self.R_cf) if self.is_episodic() else (self.T, self.R)
1051        V = (
1052            self.optimal_value_continuous_form[1]
1053            if self.is_episodic()
1054            else self.optimal_value_functions[1]
1055        )
1056        if discounted:
1057            return calculate_norm_discounted(T, V)
1058        return calculate_norm_average(
1059            T, self.optimal_transition_probabilities, self.optimal_average_rewards
1060        )
1061
1062    @property
1063    def discounted_value_norm(self) -> float:
1064        """
1065        Returns
1066        -------
1067        float
1068            The environmental value norm in the discounted setting.
1069        """
1070        if True not in self._optimal_value_norm:
1071            if (
1072                self._are_all_transition_deterministic
1073                and self._are_all_rewards_deterministic
1074            ):
1075                self._optimal_value_norm[True] = 0.0
1076            elif self.hardness_report:
1077                self._optimal_value_norm[True] = self.hardness_report[
1078                    "MDP measure of hardness"
1079                ]["value_norm"]
1080            else:
1081                self._optimal_value_norm[True] = self._compute_value_norm(True)
1082        return self._optimal_value_norm[True]
1083
1084    @property
1085    def undiscounted_value_norm(self) -> float:
1086        """
1087        Returns
1088        -------
1089        float
1090            The environmental value norm in the undiscounted setting.
1091        """
1092        if False not in self._optimal_value_norm:
1093            self._optimal_value_norm[False] = self._compute_value_norm(False)
1094        return self._optimal_value_norm[False]
1095
1096    @property
1097    def value_norm(self):
1098        """
1099        is an alias for the discounted value norm.
1100        """
1101        return self.discounted_value_norm
1102
1103    @property
1104    def measures_of_hardness(self) -> Dict[str, float]:
1105        """
1106        Returns
1107        -------
1108        Dict[str, float]
1109            The dictionary containing all the measures of hardness availble.
1110        """
1111        return dict(
1112            diameter=self.diameter,
1113            suboptimal_gaps=self.sum_reciprocals_suboptimality_gaps,
1114            value_norm=self.value_norm,
1115        )
1116
1117    @property
1118    def summary(self) -> Dict[str, Dict[str, Any]]:
1119        """
1120        Returns
1121        -------
1122        Dict[str, Dict[str, Any]]
1123            The dictionary with information about the parameters, the measures of hardness and the graph metrics.
1124        """
1125        if self._summary is None:
1126            self._summary = {
1127                "Parameters": clean_for_storing(self.parameters),
1128                "Measure of hardness": clean_for_storing(self.measures_of_hardness),
1129                "Graph metrics": clean_for_storing(self.graph_metrics),
1130            }
1131        return self._summary
1132
1133    @property
1134    def hardness_report(self) -> Union[Dict, None]:
1135        """
1136        looks for a cached hardness report in the folder given in input when the MDP was created. It returns None if it
1137        is not able to find it.
1138        """
1139        if self._hr is None:
1140            report_file = find_hardness_report_file(self, self._hardness_reports_folder)
1141            if report_file:
1142                with open(report_file, "r") as f:
1143                    report = yaml.load(f, yaml.Loader)
1144                self._hr = report
1145            else:
1146                self._hr = False
1147        if self._hr:
1148            return self._hr
1149        return None
1150
1151    def get_info_class(self, n: "NODE_TYPE") -> NodeInfoClass:
1152        """
1153        Returns
1154        -------
1155        NodeInfoClass
1156            The container class (NodeInfoClass) associated with state n.
1157        """
1158        return self.G.nodes[n]["info_class"]
1159
1160    def get_transition_distributions(
1161        self, node: "NODE_TYPE"
1162    ) -> Dict[int, Union[rv_continuous, NextStateSampler]]:
1163        """
1164        Returns
1165        -------
1166        Dict[int, Union[rv_continuous, NextStateSampler]]
1167            The dictionary containing the transition distributions for any action at the state given in input.
1168        """
1169        return self.get_info_class(node).transition_distributions
1170
1171    def get_reward_distribution(
1172        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
1173    ) -> rv_continuous:
1174        """
1175        Returns
1176        -------
1177        rv_continuous
1178            The reward distribution for transitioning in next_node when selecting action from state.
1179        """
1180        if (node, action, next_node) not in self._cached_reward_distributions:
1181            self._cached_reward_distributions[
1182                node, action, next_node
1183            ] = self._get_reward_distribution(
1184                node, self._inverse_action_mapping(node, action), next_node
1185            )
1186        return self._cached_reward_distributions[node, action, next_node]
1187
1188    def sample_reward(
1189        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
1190    ) -> float:
1191        """
1192        Returns
1193        -------
1194        float
1195            A sample from the reward distribution when transitioning in next_node when selecting action from state.
1196        """
1197        if (node, action, next_node) not in self._cached_rewards or len(
1198            self._cached_rewards[node, action, next_node]
1199        ) == 0:
1200            self._cached_rewards[node, action, next_node] = (
1201                self.get_reward_distribution(node, action, next_node)
1202                .rvs(5000, random_state=self._rng)
1203                .tolist()
1204            )
1205        r = self._cached_rewards[node, action, next_node].pop(0)
1206        return (
1207            r * (self.rewards_range[1] - self.rewards_range[0]) - self.rewards_range[0]
1208        )
1209
1210    def get_measure_from_name(self, measure_name: str) -> float:
1211        """
1212        Parameters
1213        ----------
1214        measure_name : str
1215            the code name for the measure. The available code names can be obtained using the
1216            ``BaseMDP.get_available_hardness_measures`` function.
1217        Returns
1218        -------
1219        float
1220            The value for the hardness measure.
1221        """
1222        if measure_name == "diameter":
1223            return self.diameter
1224        elif measure_name in ["value_norm", "environmental_value_norm"]:
1225            return self.value_norm
1226        elif measure_name == "suboptimal_gaps":
1227            return self.sum_reciprocals_suboptimality_gaps
1228        else:
1229            raise ValueError(
1230                f"{measure_name} is not a valid hardness measure name: available ones are "
1231                + str(self.get_available_hardness_measures())
1232            )
1233
1234    def action_spec(self) -> DiscreteArray:
1235        """
1236        Returns
1237        -------
1238        DiscreteArray
1239            The action spec of the environment.
1240        """
1241        return DiscreteArray(self.n_actions, name="action")
1242
1243    def observation_spec(self) -> Array:
1244        """
1245        Returns
1246        -------
1247        Array
1248            The observation spec of the environment.
1249        """
1250        if self.emission_map is None:
1251            return DiscreteArray(self.n_states, name="observation")
1252        obs = self.get_observation(self.starting_nodes[0], 0)
1253        return BoundedArray(obs.shape, obs.dtype, -np.inf, np.inf, "observation")
1254
1255    def get_observation(
1256        self, node: "NODE_TYPE", h: int = None
1257    ) -> Union[int, np.ndarray]:
1258        """
1259        Returns
1260        -------
1261        Union[int, np.ndarray]
1262            The representation corresponding to the state given in input. Episodic MDPs also requires the current
1263        in-episode time step.
1264        """
1265        if self.emission_map is None:
1266            return self.node_to_index[self.cur_node]
1267        return self.emission_map.get_observation(node, h)
1268
1269    def reset(self) -> dm_env.TimeStep:
1270        """
1271        resets the environment to a newly sampled starting state.
1272        """
1273        self.necessary_reset = False
1274        self.h = 0
1275        self.cur_node = self.last_starting_node = self._starting_node_sampler.sample()
1276        node_info_class = self.get_info_class(self.cur_node)
1277        node_info_class.update_visitation_counts()
1278        return dm_env.restart(self.get_observation(self.cur_node, self.h))
1279
1280    def step(self, action: int, auto_reset=False) -> dm_env.TimeStep:
1281        """
1282        takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the
1283        at the end of the episodes.
1284        Returns
1285        -------
1286        dm_env.TimeStep
1287            The TimeStep object containing the transition information.
1288        """
1289        if auto_reset and self.necessary_reset:
1290            return self.reset()
1291        assert not self.necessary_reset
1292
1293        # This can be the in episode time step (episodic setting) or the total numuber of time steps (continuous setting)
1294        self.h += 1
1295
1296        # In case the action is a numpy array
1297        action = int(action)
1298
1299        # Moving the current state according to the action played
1300        old_node = self.cur_node
1301        self.cur_node = self.get_info_class(old_node).sample_next_state(action)
1302        self.last_edge = old_node, self.cur_node
1303        node_info_class = self.get_info_class(self.cur_node)
1304        node_info_class.update_visitation_counts(action)
1305
1306        # Calculate reward and observation
1307        reward = self.sample_reward(old_node, action, self.cur_node)
1308        observation = self.get_observation(self.cur_node, self.h)
1309
1310        # Wrapping the time step in a dm_env.TimeStep
1311        if self.is_episodic() and self.h >= self.H:
1312            self.necessary_reset = True
1313            if self.emission_map is None:
1314                observation = -1
1315            else:
1316                observation = np.zeros_like(self.observation_spec().generate_value())
1317            return dm_env.termination(reward=reward, observation=observation)
1318        return dm_env.transition(reward=reward, observation=observation)
1319
1320    def random_steps(
1321        self, n: int, auto_reset=False
1322    ) -> List[Tuple[dm_env.TimeStep, int]]:
1323        """
1324        takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to
1325        True than it automatically resets episodic MDPs.
1326
1327        Returns
1328        -------
1329        dm_env.TimeStep
1330            The TimeStep object containing the transition information.
1331        int
1332            The action performed.
1333        """
1334
1335        data = []
1336        for _ in range(n):
1337            action = int(self._rng.randint(self.action_spec().num_values))
1338            ts = self.step(action, auto_reset)
1339            data.append((ts, action))
1340        return data
1341
1342    def random_step(self, auto_reset=False) -> Tuple[dm_env.TimeStep, int]:
1343        """
1344        takes a step with a random action and returns both the next step and the random action. If auto_reset is set to
1345        True than it automatically resets episodic MDPs.
1346
1347        Returns
1348        -------
1349        dm_env.TimeStep
1350            The TimeStep object containing the transition information.
1351        int
1352            The action performed.
1353        """
1354        action = int(self._rng.randint(self.action_spec().num_values))
1355        ts = self.step(action, auto_reset)
1356        return ts, action
1357
1358    def get_visitation_counts(
1359        self, state_only=True
1360    ) -> Dict[Union["NODE_TYPE", Tuple["NODE_TYPE", "ACTION_TYPE"]], int]:
1361        """
1362        when state_only is True it returns the visitation counts for the states and when it is False it returns the
1363        visitation counts for state action pairs.
1364        """
1365        if state_only:
1366            return {
1367                node: self.get_info_class(node).state_visitation_count
1368                for node in self.G.nodes
1369            }
1370        return {
1371            (node, a): self.get_info_class(node).actions_visitation_count[a]
1372            for node in self.G.nodes
1373            for a in range(self.n_actions)
1374        }
1375
1376    def reset_visitation_counts(self):
1377        """
1378        resets the visitation counts to zero for all states and state action pairs.
1379        """
1380        for node in self.G.nodes:
1381            self.get_info_class(node).state_visitation_count = 0
1382            for a in range(self.n_actions):
1383                self.get_info_class(node).actions_visitation_count[a] = 0
1384
1385    def get_value_node_labels(
1386        self: Union["ContinuousMDP", "EpisodicMDP"], V: np.ndarray = None
1387    ) -> Dict["NODE_TYPE", float]:
1388        """
1389        returns a mapping from state to state values. By default, it uses the optimal values.
1390        """
1391        if V is None:
1392            _, V = self.optimal_value_functions
1393        else:
1394            if isinstance(self, EpisodicMDP):
1395                h, d = V.shape
1396                assert h == self.H and d == self.n_states
1397            else:
1398                assert len(V) == self.n_states
1399        return {
1400            node: np.round(
1401                V[0, self.node_to_index[node]]
1402                if self.is_episodic()
1403                else V[self.node_to_index[node]],
1404                2,
1405            )
1406            for node in self.G.nodes
1407        }

The base class for MDPs.

@abc.abstractmethod
BaseMDP( seed: int, randomize_actions: bool = True, make_reward_stochastic=False, reward_variance_multiplier: float = 1.0, p_lazy: float = None, p_rand: float = None, rewards_range: Tuple[float, float] = (0.0, 1.0), emission_map: Type[colosseum.emission_maps.base.EmissionMap] = <class 'colosseum.emission_maps.tabular.Tabular'>, emission_map_kwargs: Dict[str, Any] = {}, noise: Type[colosseum.emission_maps.base.EmissionMap] = None, noise_kwargs: Dict[str, Any] = {}, instantiate_mdp: bool = True, force_sparse_transition: bool = False, exclude_horizon_from_parameters: bool = False)
327    @abc.abstractmethod
328    def __init__(
329        self: Union["EpisodicMDP", "ContinuousMDP"],
330        seed: int,
331        randomize_actions: bool = True,
332        make_reward_stochastic=False,
333        reward_variance_multiplier: float = 1.0,
334        p_lazy: float = None,
335        p_rand: float = None,
336        rewards_range: Tuple[float, float] = (0.0, 1.0),
337        emission_map: Type[EmissionMap] = Tabular,
338        emission_map_kwargs: Dict[str, Any] = dict(),
339        noise: Type[EmissionMap] = None,
340        noise_kwargs: Dict[str, Any] = dict(),
341        instantiate_mdp: bool = True,
342        force_sparse_transition: bool = False,
343        exclude_horizon_from_parameters: bool = False,
344    ):
345        """
346        Parameters
347        ----------
348        seed : int
349            The random seed.
350        randomize_actions : bool
351            If True, the outcome of action at different states is randomized. By default, it is set to True.
352        make_reward_stochastic : bool
353            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
354        reward_variance_multiplier : float
355            A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the
356             variance. By default, it is set to 1.
357        p_lazy : float
358            The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero.
359        p_rand : float
360            The probability of selecting an action at random instead of the one specified by the agent. By default, it
361            is set to zero.
362        rewards_range : Tuple[float, float]
363            The maximum range of values for the rewards. By default, it is `(0, 1)`.
364        emission_map : Type[EmissionMap]
365            The emission map that renders the MPD non-tabular. By default, no emission map is used.
366        emission_map_kwargs : Dict[str, Any]
367            The keyword arguments for the emission map.
368        noise : Type[EmissionMap]
369            The noise class to make the emission map stochastic. By default, the emission map is left deterministic.
370        noise_kwargs : Dict[str, Any]
371            The keyword arguments for the noise.
372        instantiate_mdp : bool
373            If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True.
374        force_sparse_transition : bool
375            If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not
376            enforced.
377        exclude_horizon_from_parameters : bool
378            If True, in the episodic case, the horizon is not considered a parameter when computing the `parameters`
379            properties and the hash. By default, it is not excluded.
380        """
381
382        # MDP generic variables
383        self._seed = seed
384        self._randomize_actions = randomize_actions
385        self._make_reward_stochastic = make_reward_stochastic
386        self._reward_variance_multiplier = reward_variance_multiplier
387        self._hardness_reports_folder = config.get_hardness_measures_cache_folder()
388        self._force_sparse_transition = force_sparse_transition
389        self._exclude_horizon_from_parameters = exclude_horizon_from_parameters
390        self._p_rand = p_rand if p_rand is None or p_rand > 0.0 else None
391        self._p_lazy = p_lazy if p_lazy is None or p_lazy > 0.0 else None
392        self.rewards_range = self._rewards_range = (
393            rewards_range
394            if rewards_range[0] < rewards_range[1]
395            else rewards_range[::-1]
396        )
397        self._are_all_rewards_deterministic = True
398        self._are_all_transition_deterministic = True
399        self.r_min, self.r_max = self.rewards_range
400
401        # MDP loop variables
402        self._hr = None
403        self.cur_node = None
404        self.last_edge = None
405        self.last_starting_node = None
406        self.is_reset_necessary = True
407        self.current_timestep = 0
408        self.h = 0
409        self._rng = np.random.RandomState(seed)
410        self._fast_rng = random.Random(seed)
411
412        # Caching variables. Note that we cannot use lru_cache because it prevents from deleting the objects.
413        self._cached_rewards = dict()
414        self._cached_reward_distributions = dict()
415        self._action_mapping = dict()
416        self._communication_class = None
417        self._recurrent_nodes_set = None
418        if not hasattr(self, "_transition_matrix_and_rewards"):
419            self._transition_matrix_and_rewards = None
420        self._graph_layout = None
421        self._graph_metrics = None
422        self._summary = None
423        self._diameter = None
424        self._sum_reciprocals_suboptimality_gaps = None
425        self._optimal_value_norm = dict()
426        self._optimal_value = None
427        self._worst_value = None
428        self._random_value = None
429        self._optimal_policy = dict()
430        self._worst_policy = dict()
431        self._otp = None
432        self._omc = None
433        self._osd = None
434        self._oars = None
435        self._oar = None
436        self._wtp = None
437        self._wmc = None
438        self._wsd = None
439        self._wars = None
440        self._war = None
441        self._rtp = None
442        self._rmc = None
443        self._rsd = None
444        self._rars = None
445        self._rar = None
446
447        if instantiate_mdp:
448            self.instantiate_MDP()
449
450        # Emission map
451        if emission_map == Tabular or emission_map is None:
452            self.emission_map = None
453            self.is_tabular = True
454        else:
455            noise_kwargs["seed"] = seed
456            self.emission_map = emission_map(
457                self,
458                noise_class=noise,
459                noise_kwargs=noise_kwargs,
460                **emission_map_kwargs,
461            )
462            self.is_tabular = self.emission_map.is_tabular
Parameters
  • seed (int): The random seed.
  • randomize_actions (bool): If True, the outcome of action at different states is randomized. By default, it is set to True.
  • make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
  • reward_variance_multiplier (float): A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the variance. By default, it is set to 1.
  • p_lazy (float): The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero.
  • p_rand (float): The probability of selecting an action at random instead of the one specified by the agent. By default, it is set to zero.
  • rewards_range (Tuple[float, float]): The maximum range of values for the rewards. By default, it is (0, 1).
  • emission_map (Type[EmissionMap]): The emission map that renders the MPD non-tabular. By default, no emission map is used.
  • emission_map_kwargs (Dict[str, Any]): The keyword arguments for the emission map.
  • noise (Type[EmissionMap]): The noise class to make the emission map stochastic. By default, the emission map is left deterministic.
  • noise_kwargs (Dict[str, Any]): The keyword arguments for the noise.
  • instantiate_mdp (bool): If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True.
  • force_sparse_transition (bool): If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not enforced.
  • exclude_horizon_from_parameters (bool): If True, in the episodic case, the horizon is not considered a parameter when computing the parameters properties and the hash. By default, it is not excluded.
@staticmethod
@abc.abstractmethod
def get_unique_symbols() -> List[str]:
51    @staticmethod
52    @abc.abstractmethod
53    def get_unique_symbols() -> List[str]:
54        """
55        Returns
56        -------
57        List[str]
58            the unique symbols of the grid representation of the MDP.
59        """
Returns
  • List[str]: the unique symbols of the grid representation of the MDP.
@staticmethod
def get_available_hardness_measures() -> List[str]:
61    @staticmethod
62    def get_available_hardness_measures() -> List[str]:
63        """
64        Returns
65        -------
66        List[str]
67            The hardness measures available in the package.
68        """
69        return ["diameter", "value_norm", "suboptimal_gaps"]
Returns
  • List[str]: The hardness measures available in the package.
@staticmethod
def produce_gin_file_from_mdp_parameters(parameters: Dict[str, Any], mdp_class_name: str, index: int = 0) -> str:
71    @staticmethod
72    def produce_gin_file_from_mdp_parameters(
73        parameters: Dict[str, Any], mdp_class_name: str, index: int = 0
74    ) -> str:
75        """
76        Parameters
77        ----------
78        parameters : Dict[str, Any]
79            The parameters of the MDP.
80        mdp_class_name : str
81            The name of the class of the MDP.
82        index : int
83            The index for the gin configuration
84
85        Returns
86        -------
87        str
88            The gin configuration for the given parameters and index.
89        """
90
91        string = ""
92        for k, v in parameters.items():
93            string += f"prms_{index}/{mdp_class_name}.{k} = {v}\n"
94        return string
Parameters
  • parameters (Dict[str, Any]): The parameters of the MDP.
  • mdp_class_name (str): The name of the class of the MDP.
  • index (int): The index for the gin configuration
Returns
  • str: The gin configuration for the given parameters and index.
@staticmethod
@abc.abstractmethod
def does_seed_change_MDP_structure() -> bool:
 96    @staticmethod
 97    @abc.abstractmethod
 98    def does_seed_change_MDP_structure() -> bool:
 99        """
100        Returns
101        -------
102        bool
103            True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
104        happen when there are fewer starting states that possible one and the effective starting states are picked
105        randomly based on the seed.
106        """
Returns
  • bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
  • happen when there are fewer starting states that possible one and the effective starting states are picked
  • randomly based on the seed.
@staticmethod
@abc.abstractmethod
def is_episodic() -> bool:
108    @staticmethod
109    @abc.abstractmethod
110    def is_episodic() -> bool:
111        """
112        Returns
113        -------
114        bool
115            True if the MDP is episodic.
116        """
Returns
  • bool: True if the MDP is episodic.
@staticmethod
@abc.abstractmethod
def sample_parameters(n: int, seed: int = None) -> List[Dict[str, Any]]:
118    @staticmethod
119    @abc.abstractmethod
120    def sample_parameters(n: int, seed: int = None) -> List[Dict[str, Any]]:
121        """
122        Returns
123        -------
124        List[Dict[str, Any]]
125            n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
126        """
Returns
  • List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
@staticmethod
@abc.abstractmethod
def sample_mdp_parameters(n: int, is_episodic: bool, seed: int = None) -> List[Dict[str, Any]]:
128    @staticmethod
129    @abc.abstractmethod
130    def sample_mdp_parameters(
131        n: int, is_episodic: bool, seed: int = None
132    ) -> List[Dict[str, Any]]:
133        """
134        Returns
135        -------
136        List[Dict[str, Any]]
137            n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
138        """
Returns
  • List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
140    @staticmethod
141    @abc.abstractmethod
142    def get_node_class() -> Type["NODE_TYPE"]:
143        """
144        Returns
145        -------
146        Type["NODE_TYPE"]
147            The class of the nodes of the MDP.
148        """
Returns
  • Type["NODE_TYPE"]: The class of the nodes of the MDP.
n_actions: int
Returns
  • int: The number of available actions.
200    @abc.abstractmethod
201    def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.ndarray:
202        """
203        Returns
204        -------
205        np.ndarray
206            An ASCII representation of the state given in input stored as numpy array.
207        """
Returns
  • np.ndarray: An ASCII representation of the state given in input stored as numpy array.
parameters: Dict[str, Any]
Returns
  • Dict[str, Any]: The parameters of the MDP.
@abc.abstractmethod
def get_gin_parameters(self, index: int) -> str:
247    @abc.abstractmethod
248    def get_gin_parameters(self, index: int) -> str:
249        """
250        Returns
251        -------
252        str
253            The gin config of the MDP instance.
254        """
Returns
  • str: The gin config of the MDP instance.
def get_gin_config(self, index: int) -> str:
256    def get_gin_config(self, index: int) -> str:
257        """
258        Returns
259        -------
260        str
261            The gin config file of the MDP.
262        """
263        return "".join(self.get_gin_parameters(index))
Returns
  • str: The gin config file of the MDP.
265    def get_node_labels(self, l: List[Any]) -> Dict["NODE_TYPE", Any]:
266        """
267        Returns
268        -------
269        Dict["NODE_TYPE", Any]
270            A dictionary that assigns to each node the value of the list in the corresponding index.
271        """
272        assert len(l) == self.n_states, (
273            f"The array in input should have the same dimensionality of the number of states {self.n_states}, "
274            f" {len(l)} long array received instead."
275        )
276        return {self.index_to_node[i]: l[i] for i in range(len(l))}
Returns
  • Dict["NODE_TYPE", Any]: A dictionary that assigns to each node the value of the list in the corresponding index.
278    def get_node_action_labels(self, l: List[List[Any]]) -> Dict["NODE_TYPE", float]:
279        """
280        Returns
281        -------
282        Dict["NODE_TYPE", Any]
283            A dictionary that assigns to each node and action the value of the list in the corresponding index.
284        """
285        assert len(l) == self.n_states, (
286            f"The list in input should be the same length of the number of states {self.n_states}, "
287            f" list of length {len(l)} received instead."
288        )
289        assert all(
290            len(ll) == self.n_actions for ll in l
291        ), f"The lists inside the lists should be the same length as the number of actions {self.n_actions}, "
292        return {
293            (self.index_to_node[i], action): l[i][action]
294            for i in range(len(l))
295            for action in range(self.n_actions)
296        }
Returns
  • Dict["NODE_TYPE", Any]: A dictionary that assigns to each node and action the value of the list in the corresponding index.
hash: str
Returns
  • str: The hash value based on the parameters of the MDP. This can be used to create cache files.
def instantiate_MDP(self):
464    def instantiate_MDP(self):
465        """
466        recurisively instantiate the MDP.
467        """
468        # Instantiating the MDP
469        self._check_parameters_in_input()
470        self._starting_node_sampler = self._get_starting_node_sampler()
471        self.starting_nodes = self._starting_node_sampler.next_nodes
472        self.G = nx.DiGraph()
473        self._instantiate_mdp()
474        self.n_states = len(self.G.nodes)
475
476        # Some shortcuts variables
477        if not self.is_episodic():
478            self._vi, self._pe = (
479                discounted_value_iteration,
480                discounted_policy_evaluation,
481            )
482            self.random_policy = (
483                np.ones((self.n_states, self.n_actions), dtype=np.float32)
484                / self.n_actions
485            )
486
487        # Cache state to index mapping
488        mapping = self._rng.rand(self.n_states, self.n_actions).argsort(1)
489        self.node_to_index = dict()
490        self.index_to_node = dict()
491        for i, node in enumerate(self.G.nodes):
492            self.node_to_index[node] = i
493            self.index_to_node[i] = node
494
495        # Compute the starting state distribution
496        self.starting_state_distribution = np.zeros(self.n_states)
497        self.starting_states = []
498        for n, p in self._starting_node_sampler.next_nodes_and_probs:
499            s = self.node_to_index[n]
500            self.starting_state_distribution[s] = p
501            self.starting_states.append(s)
502        self.starting_states_and_probs = list(
503            zip(self.starting_states, self._starting_node_sampler.probs)
504        )

recurisively instantiate the MDP.

T: numpy.ndarray

is an alias for the transition matrix.

R: numpy.ndarray

is an alias for the rewards matrix.

Returns
  • MDPCommunicationClass: The communication class.
def get_optimal_policy(self, stochastic_form: bool) -> numpy.ndarray:
593    def get_optimal_policy(self, stochastic_form: bool) -> np.ndarray:
594        """
595        Returns
596        -------
597        np.ndarray
598            The optimal policy. It can be either in the stochastic form, so in the form of dirac probability
599        distribution or as a simple mapping to integer.
600        """
601        if stochastic_form not in self._optimal_policy:
602            self._optimal_policy[stochastic_form] = get_policy_from_q_values(
603                self.optimal_value_functions[0], stochastic_form
604            )
605        return self._optimal_policy[stochastic_form]
Returns
  • np.ndarray: The optimal policy. It can be either in the stochastic form, so in the form of dirac probability
  • distribution or as a simple mapping to integer.
def get_worst_policy(self, stochastic_form) -> numpy.ndarray:
607    def get_worst_policy(self, stochastic_form) -> np.ndarray:
608        """
609        Returns
610        -------
611        np.ndarray
612            The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability
613        distribution or as a simple mapping to integer.
614        """
615        if stochastic_form not in self._worst_policy:
616            self._worst_policy[stochastic_form] = get_policy_from_q_values(
617                self._vi(self.T, -self.R)[0], stochastic_form
618            )
619        return self._worst_policy[stochastic_form]
Returns
  • np.ndarray: The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability
  • distribution or as a simple mapping to integer.
def get_value_functions(self, policy: numpy.ndarray) -> Tuple[numpy.ndarray, numpy.ndarray]:
621    def get_value_functions(self, policy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
622        """
623        Parameters
624        ----------
625        policy : np.ndarray
626            a numpy array containing a continuous policy.
627        Returns
628        -------
629        Q : np.ndarray
630            the state action value function.
631        V : np.ndarray
632            the state value function.
633        """
634        return self._pe(*self.transition_matrix_and_rewards, policy)
Parameters
  • policy (np.ndarray): a numpy array containing a continuous policy.
Returns
  • Q (np.ndarray): the state action value function.
  • V (np.ndarray): the state value function.
optimal_value_functions: Tuple[numpy.ndarray, numpy.ndarray]
Returns
  • Q (np.ndarray): the optimal state action value function.
  • V (np.ndarray): the optimal state value function.
worst_value_functions: Tuple[numpy.ndarray, numpy.ndarray]
Returns
  • Q (np.ndarray): the state action value function of the worst policy.
  • V (np.ndarray): the state value function of the worst policy.
random_value_functions: Tuple[numpy.ndarray, numpy.ndarray]
Returns
  • Q (np.ndarray): the state action value function of the random uniform policy.
  • V (np.ndarray): the state value function of the random uniform policy.
optimal_transition_probabilities: numpy.ndarray
Returns
  • np.ndarray: The transition probabilities corresponding to the optimal policy.
worst_transition_probabilities: numpy.ndarray
Returns
  • np.ndarray: The transition probabilities corresponding to the worst policy.
random_transition_probabilities: numpy.ndarray
Returns
  • np.ndarray: The transition probabilities corresponding to the random uniform policy.
optimal_markov_chain: pydtmc.markov_chain.MarkovChain
Returns
  • MarkovChain: The Markov chain corresponding to the optimal policy.
worst_markov_chain: pydtmc.markov_chain.MarkovChain
Returns
  • MarkovChain: The Markov chain corresponding to the worst policy.
random_markov_chain: pydtmc.markov_chain.MarkovChain
Returns
  • MarkovChain: The Markov chain corresponding to the random uniform policy.
def get_stationary_distribution(self, policy: numpy.ndarray) -> numpy.ndarray:
768    def get_stationary_distribution(self, policy: np.ndarray) -> np.ndarray:
769        """
770        Parameters
771        ----------
772        policy : np.ndarray
773            a numpy array containing a continuous policy.
774        Returns
775        -------
776        np.ndarray
777            the stationary distribution of the Markov chain yielded by the policy.
778        """
779        return get_stationary_distribution(
780            get_transition_probabilities(self.T, policy),
781            self.starting_states_and_probs,
782        )
Parameters
  • policy (np.ndarray): a numpy array containing a continuous policy.
Returns
  • np.ndarray: the stationary distribution of the Markov chain yielded by the policy.
optimal_stationary_distribution: <built-in function array>
Returns
  • np.ndarray: the stationary distribution of the Markov chain yielded by the optimal policy.
worst_stationary_distribution: <built-in function array>
Returns
  • np.ndarray: the stationary distribution of the Markov chain yielded by the worst policy.
random_stationary_distribution: <built-in function array>
Returns
  • np.ndarray: the stationary distribution of the Markov chain yielded by the random uniform policy.
optimal_average_rewards: numpy.ndarray
Returns
  • np.ndarray: The average rewards obtained for each state when following the optimal policy.
worst_average_rewards: numpy.ndarray
Returns
  • np.ndarray: The average rewards obtained for each state when following the worst policy.
random_average_rewards: numpy.ndarray
Returns
  • np.ndarray: The average rewards obtained for each state when following the random uniform policy.
def get_average_reward(self, policy: numpy.ndarray) -> float:
880    def get_average_reward(self, policy: np.ndarray) -> float:
881        r"""
882        Parameters
883        ----------
884        policy : np.ndarray
885            a numpy array containing a continuous policy.
886        Returns
887        -------
888        average_reward : float
889            the expected time average reward, i.e.
890            :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with
891             the policy.
892        """
893        sd = self.get_stationary_distribution(policy)
894        return sum(sd * get_average_rewards(self.R, policy))
Parameters
  • policy (np.ndarray): a numpy array containing a continuous policy.
Returns
  • average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the policy.
optimal_average_reward: float
Returns
  • average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the optimal policy.
worst_average_reward: float
Returns
  • average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the worst policy.
random_average_reward: float
Returns
  • average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the random uniform policy.
transition_matrix_and_rewards: Tuple[numpy.ndarray, numpy.ndarray]
Returns
  • np.ndarray: The transition probabilities 3d numpy array.
  • np.ndarray: The reward matrix.
graph_metrics: Dict[str, Any]
Returns
  • Dict[str, Any]: The dictionary containing graph metrics for the graph underlying the MDP.
diameter: float
Returns
  • float: The diameter of the MDP.
sum_reciprocals_suboptimality_gaps: float
Returns
  • float: The sum of the reciprocals of the sub-optimality gaps
discounted_value_norm: float
Returns
  • float: The environmental value norm in the discounted setting.
undiscounted_value_norm: float
Returns
  • float: The environmental value norm in the undiscounted setting.
value_norm

is an alias for the discounted value norm.

measures_of_hardness: Dict[str, float]
Returns
  • Dict[str, float]: The dictionary containing all the measures of hardness availble.
summary: Dict[str, Dict[str, Any]]
Returns
  • Dict[str, Dict[str, Any]]: The dictionary with information about the parameters, the measures of hardness and the graph metrics.
hardness_report: Optional[Dict]

looks for a cached hardness report in the folder given in input when the MDP was created. It returns None if it is not able to find it.

1151    def get_info_class(self, n: "NODE_TYPE") -> NodeInfoClass:
1152        """
1153        Returns
1154        -------
1155        NodeInfoClass
1156            The container class (NodeInfoClass) associated with state n.
1157        """
1158        return self.G.nodes[n]["info_class"]
Returns
  • NodeInfoClass: The container class (NodeInfoClass) associated with state n.
1160    def get_transition_distributions(
1161        self, node: "NODE_TYPE"
1162    ) -> Dict[int, Union[rv_continuous, NextStateSampler]]:
1163        """
1164        Returns
1165        -------
1166        Dict[int, Union[rv_continuous, NextStateSampler]]
1167            The dictionary containing the transition distributions for any action at the state given in input.
1168        """
1169        return self.get_info_class(node).transition_distributions
Returns
  • Dict[int, Union[rv_continuous, NextStateSampler]]: The dictionary containing the transition distributions for any action at the state given in input.
1171    def get_reward_distribution(
1172        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
1173    ) -> rv_continuous:
1174        """
1175        Returns
1176        -------
1177        rv_continuous
1178            The reward distribution for transitioning in next_node when selecting action from state.
1179        """
1180        if (node, action, next_node) not in self._cached_reward_distributions:
1181            self._cached_reward_distributions[
1182                node, action, next_node
1183            ] = self._get_reward_distribution(
1184                node, self._inverse_action_mapping(node, action), next_node
1185            )
1186        return self._cached_reward_distributions[node, action, next_node]
Returns
  • rv_continuous: The reward distribution for transitioning in next_node when selecting action from state.
1188    def sample_reward(
1189        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
1190    ) -> float:
1191        """
1192        Returns
1193        -------
1194        float
1195            A sample from the reward distribution when transitioning in next_node when selecting action from state.
1196        """
1197        if (node, action, next_node) not in self._cached_rewards or len(
1198            self._cached_rewards[node, action, next_node]
1199        ) == 0:
1200            self._cached_rewards[node, action, next_node] = (
1201                self.get_reward_distribution(node, action, next_node)
1202                .rvs(5000, random_state=self._rng)
1203                .tolist()
1204            )
1205        r = self._cached_rewards[node, action, next_node].pop(0)
1206        return (
1207            r * (self.rewards_range[1] - self.rewards_range[0]) - self.rewards_range[0]
1208        )
Returns
  • float: A sample from the reward distribution when transitioning in next_node when selecting action from state.
def get_measure_from_name(self, measure_name: str) -> float:
1210    def get_measure_from_name(self, measure_name: str) -> float:
1211        """
1212        Parameters
1213        ----------
1214        measure_name : str
1215            the code name for the measure. The available code names can be obtained using the
1216            ``BaseMDP.get_available_hardness_measures`` function.
1217        Returns
1218        -------
1219        float
1220            The value for the hardness measure.
1221        """
1222        if measure_name == "diameter":
1223            return self.diameter
1224        elif measure_name in ["value_norm", "environmental_value_norm"]:
1225            return self.value_norm
1226        elif measure_name == "suboptimal_gaps":
1227            return self.sum_reciprocals_suboptimality_gaps
1228        else:
1229            raise ValueError(
1230                f"{measure_name} is not a valid hardness measure name: available ones are "
1231                + str(self.get_available_hardness_measures())
1232            )
Parameters
Returns
  • float: The value for the hardness measure.
def action_spec(self) -> dm_env.specs.DiscreteArray:
1234    def action_spec(self) -> DiscreteArray:
1235        """
1236        Returns
1237        -------
1238        DiscreteArray
1239            The action spec of the environment.
1240        """
1241        return DiscreteArray(self.n_actions, name="action")
Returns
  • DiscreteArray: The action spec of the environment.
def observation_spec(self) -> dm_env.specs.Array:
1243    def observation_spec(self) -> Array:
1244        """
1245        Returns
1246        -------
1247        Array
1248            The observation spec of the environment.
1249        """
1250        if self.emission_map is None:
1251            return DiscreteArray(self.n_states, name="observation")
1252        obs = self.get_observation(self.starting_nodes[0], 0)
1253        return BoundedArray(obs.shape, obs.dtype, -np.inf, np.inf, "observation")
Returns
  • Array: The observation spec of the environment.
1255    def get_observation(
1256        self, node: "NODE_TYPE", h: int = None
1257    ) -> Union[int, np.ndarray]:
1258        """
1259        Returns
1260        -------
1261        Union[int, np.ndarray]
1262            The representation corresponding to the state given in input. Episodic MDPs also requires the current
1263        in-episode time step.
1264        """
1265        if self.emission_map is None:
1266            return self.node_to_index[self.cur_node]
1267        return self.emission_map.get_observation(node, h)
Returns
  • Union[int, np.ndarray]: The representation corresponding to the state given in input. Episodic MDPs also requires the current
  • in-episode time step.
def reset(self) -> dm_env._environment.TimeStep:
1269    def reset(self) -> dm_env.TimeStep:
1270        """
1271        resets the environment to a newly sampled starting state.
1272        """
1273        self.necessary_reset = False
1274        self.h = 0
1275        self.cur_node = self.last_starting_node = self._starting_node_sampler.sample()
1276        node_info_class = self.get_info_class(self.cur_node)
1277        node_info_class.update_visitation_counts()
1278        return dm_env.restart(self.get_observation(self.cur_node, self.h))

resets the environment to a newly sampled starting state.

def step(self, action: int, auto_reset=False) -> dm_env._environment.TimeStep:
1280    def step(self, action: int, auto_reset=False) -> dm_env.TimeStep:
1281        """
1282        takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the
1283        at the end of the episodes.
1284        Returns
1285        -------
1286        dm_env.TimeStep
1287            The TimeStep object containing the transition information.
1288        """
1289        if auto_reset and self.necessary_reset:
1290            return self.reset()
1291        assert not self.necessary_reset
1292
1293        # This can be the in episode time step (episodic setting) or the total numuber of time steps (continuous setting)
1294        self.h += 1
1295
1296        # In case the action is a numpy array
1297        action = int(action)
1298
1299        # Moving the current state according to the action played
1300        old_node = self.cur_node
1301        self.cur_node = self.get_info_class(old_node).sample_next_state(action)
1302        self.last_edge = old_node, self.cur_node
1303        node_info_class = self.get_info_class(self.cur_node)
1304        node_info_class.update_visitation_counts(action)
1305
1306        # Calculate reward and observation
1307        reward = self.sample_reward(old_node, action, self.cur_node)
1308        observation = self.get_observation(self.cur_node, self.h)
1309
1310        # Wrapping the time step in a dm_env.TimeStep
1311        if self.is_episodic() and self.h >= self.H:
1312            self.necessary_reset = True
1313            if self.emission_map is None:
1314                observation = -1
1315            else:
1316                observation = np.zeros_like(self.observation_spec().generate_value())
1317            return dm_env.termination(reward=reward, observation=observation)
1318        return dm_env.transition(reward=reward, observation=observation)

takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the at the end of the episodes.

Returns
  • dm_env.TimeStep: The TimeStep object containing the transition information.
def random_steps( self, n: int, auto_reset=False) -> List[Tuple[dm_env._environment.TimeStep, int]]:
1320    def random_steps(
1321        self, n: int, auto_reset=False
1322    ) -> List[Tuple[dm_env.TimeStep, int]]:
1323        """
1324        takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to
1325        True than it automatically resets episodic MDPs.
1326
1327        Returns
1328        -------
1329        dm_env.TimeStep
1330            The TimeStep object containing the transition information.
1331        int
1332            The action performed.
1333        """
1334
1335        data = []
1336        for _ in range(n):
1337            action = int(self._rng.randint(self.action_spec().num_values))
1338            ts = self.step(action, auto_reset)
1339            data.append((ts, action))
1340        return data

takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to True than it automatically resets episodic MDPs.

Returns
  • dm_env.TimeStep: The TimeStep object containing the transition information.
  • int: The action performed.
def random_step(self, auto_reset=False) -> Tuple[dm_env._environment.TimeStep, int]:
1342    def random_step(self, auto_reset=False) -> Tuple[dm_env.TimeStep, int]:
1343        """
1344        takes a step with a random action and returns both the next step and the random action. If auto_reset is set to
1345        True than it automatically resets episodic MDPs.
1346
1347        Returns
1348        -------
1349        dm_env.TimeStep
1350            The TimeStep object containing the transition information.
1351        int
1352            The action performed.
1353        """
1354        action = int(self._rng.randint(self.action_spec().num_values))
1355        ts = self.step(action, auto_reset)
1356        return ts, action

takes a step with a random action and returns both the next step and the random action. If auto_reset is set to True than it automatically resets episodic MDPs.

Returns
  • dm_env.TimeStep: The TimeStep object containing the transition information.
  • int: The action performed.
1358    def get_visitation_counts(
1359        self, state_only=True
1360    ) -> Dict[Union["NODE_TYPE", Tuple["NODE_TYPE", "ACTION_TYPE"]], int]:
1361        """
1362        when state_only is True it returns the visitation counts for the states and when it is False it returns the
1363        visitation counts for state action pairs.
1364        """
1365        if state_only:
1366            return {
1367                node: self.get_info_class(node).state_visitation_count
1368                for node in self.G.nodes
1369            }
1370        return {
1371            (node, a): self.get_info_class(node).actions_visitation_count[a]
1372            for node in self.G.nodes
1373            for a in range(self.n_actions)
1374        }

when state_only is True it returns the visitation counts for the states and when it is False it returns the visitation counts for state action pairs.

def reset_visitation_counts(self):
1376    def reset_visitation_counts(self):
1377        """
1378        resets the visitation counts to zero for all states and state action pairs.
1379        """
1380        for node in self.G.nodes:
1381            self.get_info_class(node).state_visitation_count = 0
1382            for a in range(self.n_actions):
1383                self.get_info_class(node).actions_visitation_count[a] = 0

resets the visitation counts to zero for all states and state action pairs.

1385    def get_value_node_labels(
1386        self: Union["ContinuousMDP", "EpisodicMDP"], V: np.ndarray = None
1387    ) -> Dict["NODE_TYPE", float]:
1388        """
1389        returns a mapping from state to state values. By default, it uses the optimal values.
1390        """
1391        if V is None:
1392            _, V = self.optimal_value_functions
1393        else:
1394            if isinstance(self, EpisodicMDP):
1395                h, d = V.shape
1396                assert h == self.H and d == self.n_states
1397            else:
1398                assert len(V) == self.n_states
1399        return {
1400            node: np.round(
1401                V[0, self.node_to_index[node]]
1402                if self.is_episodic()
1403                else V[self.node_to_index[node]],
1404                2,
1405            )
1406            for node in self.G.nodes
1407        }

returns a mapping from state to state values. By default, it uses the optimal values.

Inherited Members
dm_env._environment.Environment
reward_spec
discount_spec
close