colosseum.mdp.base
1import abc 2import random 3import sys 4from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, Type, Union 5 6import dm_env 7import networkx as nx 8import numpy as np 9import yaml 10from dm_env.specs import BoundedArray, Array 11from pydtmc import MarkovChain 12from scipy.stats import rv_continuous 13 14from colosseum import config 15from colosseum.dynamic_programming import discounted_value_iteration 16from colosseum.dynamic_programming.infinite_horizon import discounted_policy_evaluation 17from colosseum.dynamic_programming.utils import get_policy_from_q_values 18from colosseum.emission_maps import EmissionMap 19from colosseum.emission_maps import Tabular 20from colosseum.hardness.measures import calculate_norm_average 21from colosseum.hardness.measures import calculate_norm_discounted 22from colosseum.hardness.measures import find_hardness_report_file 23from colosseum.hardness.measures import get_diameter 24from colosseum.hardness.measures import get_sum_reciprocals_suboptimality_gaps 25from colosseum.mdp.utils.communication_class import MDPCommunicationClass 26from colosseum.mdp.utils.communication_class import get_communication_class 27from colosseum.mdp.utils.communication_class import get_recurrent_nodes_set 28from colosseum.mdp.utils.custom_samplers import NextStateSampler 29from colosseum.mdp.utils.markov_chain import get_average_rewards, get_markov_chain 30from colosseum.mdp.utils.markov_chain import get_stationary_distribution 31from colosseum.mdp.utils.markov_chain import get_transition_probabilities 32from colosseum.mdp.utils.mdp_creation import NodeInfoClass 33from colosseum.mdp.utils.mdp_creation import get_transition_matrix_and_rewards 34from colosseum.mdp.utils.mdp_creation import instantiate_transitions 35from colosseum.utils import clean_for_storing 36from colosseum.utils.acme.specs import DiscreteArray 37from colosseum.utils.formatter import clean_for_file_path 38 39if TYPE_CHECKING: 40 from colosseum.mdp import ACTION_TYPE, NODE_TYPE, ContinuousMDP, EpisodicMDP 41 42sys.setrecursionlimit(5000) 43 44 45class BaseMDP(dm_env.Environment, abc.ABC): 46 """ 47 The base class for MDPs. 48 """ 49 50 @staticmethod 51 @abc.abstractmethod 52 def get_unique_symbols() -> List[str]: 53 """ 54 Returns 55 ------- 56 List[str] 57 the unique symbols of the grid representation of the MDP. 58 """ 59 60 @staticmethod 61 def get_available_hardness_measures() -> List[str]: 62 """ 63 Returns 64 ------- 65 List[str] 66 The hardness measures available in the package. 67 """ 68 return ["diameter", "value_norm", "suboptimal_gaps"] 69 70 @staticmethod 71 def produce_gin_file_from_mdp_parameters( 72 parameters: Dict[str, Any], mdp_class_name: str, index: int = 0 73 ) -> str: 74 """ 75 Parameters 76 ---------- 77 parameters : Dict[str, Any] 78 The parameters of the MDP. 79 mdp_class_name : str 80 The name of the class of the MDP. 81 index : int 82 The index for the gin configuration 83 84 Returns 85 ------- 86 str 87 The gin configuration for the given parameters and index. 88 """ 89 90 string = "" 91 for k, v in parameters.items(): 92 string += f"prms_{index}/{mdp_class_name}.{k} = {v}\n" 93 return string 94 95 @staticmethod 96 @abc.abstractmethod 97 def does_seed_change_MDP_structure() -> bool: 98 """ 99 Returns 100 ------- 101 bool 102 True if when changing the seed the transition matrix and/or rewards matrix change. This for example may 103 happen when there are fewer starting states that possible one and the effective starting states are picked 104 randomly based on the seed. 105 """ 106 107 @staticmethod 108 @abc.abstractmethod 109 def is_episodic() -> bool: 110 """ 111 Returns 112 ------- 113 bool 114 True if the MDP is episodic. 115 """ 116 117 @staticmethod 118 @abc.abstractmethod 119 def sample_parameters(n: int, seed: int = None) -> List[Dict[str, Any]]: 120 """ 121 Returns 122 ------- 123 List[Dict[str, Any]] 124 n sampled parameters that can be used to construct an MDP in a reasonable amount of time. 125 """ 126 127 @staticmethod 128 @abc.abstractmethod 129 def sample_mdp_parameters( 130 n: int, is_episodic: bool, seed: int = None 131 ) -> List[Dict[str, Any]]: 132 """ 133 Returns 134 ------- 135 List[Dict[str, Any]] 136 n sampled parameters that can be used to construct an MDP in a reasonable amount of time. 137 """ 138 139 @staticmethod 140 @abc.abstractmethod 141 def get_node_class() -> Type["NODE_TYPE"]: 142 """ 143 Returns 144 ------- 145 Type["NODE_TYPE"] 146 The class of the nodes of the MDP. 147 """ 148 149 @property 150 @abc.abstractmethod 151 def n_actions(self) -> int: 152 """ 153 Returns 154 ------- 155 int 156 The number of available actions. 157 """ 158 159 @abc.abstractmethod 160 def _get_next_nodes_parameters( 161 self, node: "NODE_TYPE", action: "ACTION_TYPE" 162 ) -> Tuple[Tuple[dict, float], ...]: 163 """ 164 Returns 165 ------- 166 Tuple[Tuple[dict, float], ...] 167 The parameters of the possible next nodes reachable from the given state and action. 168 """ 169 170 @abc.abstractmethod 171 def _get_reward_distribution( 172 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 173 ) -> rv_continuous: 174 """ 175 Returns 176 ------- 177 rv_continuous 178 The distribution over rewards in the zero one interval when transitioning from a given state and action 179 to a given next state. Note that rescaling the rewards to a different range is handled separately. 180 """ 181 182 @abc.abstractmethod 183 def _get_starting_node_sampler(self) -> NextStateSampler: 184 """ 185 Returns 186 ------- 187 NextStateSampler 188 The sampler over next states that corresponds to the starting state distribution. 189 """ 190 191 @abc.abstractmethod 192 def _check_parameters_in_input(self): 193 """ 194 checks whether the parameters given in input can produce a correct MDP instance. 195 """ 196 assert self._p_rand is None or (0 < self._p_rand < 0.9999) 197 assert self._p_lazy is None or (0 < self._p_lazy < 0.9999) 198 199 @abc.abstractmethod 200 def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.ndarray: 201 """ 202 Returns 203 ------- 204 np.ndarray 205 An ASCII representation of the state given in input stored as numpy array. 206 """ 207 208 @abc.abstractmethod 209 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 210 """ 211 Returns 212 ------- 213 np.ndarray 214 An ASCII representation of the state given in input stored as numpy array. 215 """ 216 217 @property 218 @abc.abstractmethod 219 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 220 """ 221 Returns 222 ------- 223 List["NODE_TYPE"] 224 The list containing all the possible starting state for this MDP instance. 225 """ 226 227 @property 228 @abc.abstractmethod 229 def parameters(self) -> Dict[str, Any]: 230 """ 231 Returns 232 ------- 233 Dict[str, Any] 234 The parameters of the MDP. 235 """ 236 return dict( 237 seed=self._seed, 238 randomize_actions=self._randomize_actions, 239 p_lazy=self._p_lazy, 240 p_rand=self._p_rand, 241 rewards_range=self._rewards_range, 242 make_reward_stochastic=self._make_reward_stochastic, 243 reward_variance_multiplier=self._reward_variance_multiplier, 244 ) 245 246 @abc.abstractmethod 247 def get_gin_parameters(self, index: int) -> str: 248 """ 249 Returns 250 ------- 251 str 252 The gin config of the MDP instance. 253 """ 254 255 def get_gin_config(self, index: int) -> str: 256 """ 257 Returns 258 ------- 259 str 260 The gin config file of the MDP. 261 """ 262 return "".join(self.get_gin_parameters(index)) 263 264 def get_node_labels(self, l: List[Any]) -> Dict["NODE_TYPE", Any]: 265 """ 266 Returns 267 ------- 268 Dict["NODE_TYPE", Any] 269 A dictionary that assigns to each node the value of the list in the corresponding index. 270 """ 271 assert len(l) == self.n_states, ( 272 f"The array in input should have the same dimensionality of the number of states {self.n_states}, " 273 f" {len(l)} long array received instead." 274 ) 275 return {self.index_to_node[i]: l[i] for i in range(len(l))} 276 277 def get_node_action_labels(self, l: List[List[Any]]) -> Dict["NODE_TYPE", float]: 278 """ 279 Returns 280 ------- 281 Dict["NODE_TYPE", Any] 282 A dictionary that assigns to each node and action the value of the list in the corresponding index. 283 """ 284 assert len(l) == self.n_states, ( 285 f"The list in input should be the same length of the number of states {self.n_states}, " 286 f" list of length {len(l)} received instead." 287 ) 288 assert all( 289 len(ll) == self.n_actions for ll in l 290 ), f"The lists inside the lists should be the same length as the number of actions {self.n_actions}, " 291 return { 292 (self.index_to_node[i], action): l[i][action] 293 for i in range(len(l)) 294 for action in range(self.n_actions) 295 } 296 297 @property 298 def hash(self) -> str: 299 """ 300 Returns 301 ------- 302 str 303 The hash value based on the parameters of the MDP. This can be used to create cache files. 304 """ 305 s = "_".join(map(str, clean_for_storing(list(self.parameters.values())))) 306 return f"mdp_{type(self).__name__}_" + clean_for_file_path(s) 307 308 def __str__(self) -> str: 309 """ 310 Returns 311 ------- 312 str 313 The string containing all the information about the MDP including parameters, measures of hardness and 314 graph metrics. 315 """ 316 string = type(self).__name__ + "\n" 317 m_l = 0 318 for k, v in self.summary.items(): 319 m_l = max(m_l, len(max(v.keys(), key=len)) + 4) 320 for k, v in self.summary.items(): 321 string += "\t" + k + "\n" 322 for kk, vv in v.items(): 323 string += f"\t\t{kk}{' ' * (m_l - len(kk))}:\t{vv}\n" 324 return string 325 326 @abc.abstractmethod 327 def __init__( 328 self: Union["EpisodicMDP", "ContinuousMDP"], 329 seed: int, 330 randomize_actions: bool = True, 331 make_reward_stochastic=False, 332 reward_variance_multiplier: float = 1.0, 333 p_lazy: float = None, 334 p_rand: float = None, 335 rewards_range: Tuple[float, float] = (0.0, 1.0), 336 emission_map: Type[EmissionMap] = Tabular, 337 emission_map_kwargs: Dict[str, Any] = dict(), 338 noise: Type[EmissionMap] = None, 339 noise_kwargs: Dict[str, Any] = dict(), 340 instantiate_mdp: bool = True, 341 force_sparse_transition: bool = False, 342 exclude_horizon_from_parameters: bool = False, 343 ): 344 """ 345 Parameters 346 ---------- 347 seed : int 348 The random seed. 349 randomize_actions : bool 350 If True, the outcome of action at different states is randomized. By default, it is set to True. 351 make_reward_stochastic : bool 352 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 353 reward_variance_multiplier : float 354 A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the 355 variance. By default, it is set to 1. 356 p_lazy : float 357 The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero. 358 p_rand : float 359 The probability of selecting an action at random instead of the one specified by the agent. By default, it 360 is set to zero. 361 rewards_range : Tuple[float, float] 362 The maximum range of values for the rewards. By default, it is `(0, 1)`. 363 emission_map : Type[EmissionMap] 364 The emission map that renders the MPD non-tabular. By default, no emission map is used. 365 emission_map_kwargs : Dict[str, Any] 366 The keyword arguments for the emission map. 367 noise : Type[EmissionMap] 368 The noise class to make the emission map stochastic. By default, the emission map is left deterministic. 369 noise_kwargs : Dict[str, Any] 370 The keyword arguments for the noise. 371 instantiate_mdp : bool 372 If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True. 373 force_sparse_transition : bool 374 If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not 375 enforced. 376 exclude_horizon_from_parameters : bool 377 If True, in the episodic case, the horizon is not considered a parameter when computing the `parameters` 378 properties and the hash. By default, it is not excluded. 379 """ 380 381 # MDP generic variables 382 self._seed = seed 383 self._randomize_actions = randomize_actions 384 self._make_reward_stochastic = make_reward_stochastic 385 self._reward_variance_multiplier = reward_variance_multiplier 386 self._hardness_reports_folder = config.get_hardness_measures_cache_folder() 387 self._force_sparse_transition = force_sparse_transition 388 self._exclude_horizon_from_parameters = exclude_horizon_from_parameters 389 self._p_rand = p_rand if p_rand is None or p_rand > 0.0 else None 390 self._p_lazy = p_lazy if p_lazy is None or p_lazy > 0.0 else None 391 self.rewards_range = self._rewards_range = ( 392 rewards_range 393 if rewards_range[0] < rewards_range[1] 394 else rewards_range[::-1] 395 ) 396 self._are_all_rewards_deterministic = True 397 self._are_all_transition_deterministic = True 398 self.r_min, self.r_max = self.rewards_range 399 400 # MDP loop variables 401 self._hr = None 402 self.cur_node = None 403 self.last_edge = None 404 self.last_starting_node = None 405 self.is_reset_necessary = True 406 self.current_timestep = 0 407 self.h = 0 408 self._rng = np.random.RandomState(seed) 409 self._fast_rng = random.Random(seed) 410 411 # Caching variables. Note that we cannot use lru_cache because it prevents from deleting the objects. 412 self._cached_rewards = dict() 413 self._cached_reward_distributions = dict() 414 self._action_mapping = dict() 415 self._communication_class = None 416 self._recurrent_nodes_set = None 417 if not hasattr(self, "_transition_matrix_and_rewards"): 418 self._transition_matrix_and_rewards = None 419 self._graph_layout = None 420 self._graph_metrics = None 421 self._summary = None 422 self._diameter = None 423 self._sum_reciprocals_suboptimality_gaps = None 424 self._optimal_value_norm = dict() 425 self._optimal_value = None 426 self._worst_value = None 427 self._random_value = None 428 self._optimal_policy = dict() 429 self._worst_policy = dict() 430 self._otp = None 431 self._omc = None 432 self._osd = None 433 self._oars = None 434 self._oar = None 435 self._wtp = None 436 self._wmc = None 437 self._wsd = None 438 self._wars = None 439 self._war = None 440 self._rtp = None 441 self._rmc = None 442 self._rsd = None 443 self._rars = None 444 self._rar = None 445 446 if instantiate_mdp: 447 self.instantiate_MDP() 448 449 # Emission map 450 if emission_map == Tabular or emission_map is None: 451 self.emission_map = None 452 self.is_tabular = True 453 else: 454 noise_kwargs["seed"] = seed 455 self.emission_map = emission_map( 456 self, 457 noise_class=noise, 458 noise_kwargs=noise_kwargs, 459 **emission_map_kwargs, 460 ) 461 self.is_tabular = self.emission_map.is_tabular 462 463 def instantiate_MDP(self): 464 """ 465 recurisively instantiate the MDP. 466 """ 467 # Instantiating the MDP 468 self._check_parameters_in_input() 469 self._starting_node_sampler = self._get_starting_node_sampler() 470 self.starting_nodes = self._starting_node_sampler.next_nodes 471 self.G = nx.DiGraph() 472 self._instantiate_mdp() 473 self.n_states = len(self.G.nodes) 474 475 # Some shortcuts variables 476 if not self.is_episodic(): 477 self._vi, self._pe = ( 478 discounted_value_iteration, 479 discounted_policy_evaluation, 480 ) 481 self.random_policy = ( 482 np.ones((self.n_states, self.n_actions), dtype=np.float32) 483 / self.n_actions 484 ) 485 486 # Cache state to index mapping 487 mapping = self._rng.rand(self.n_states, self.n_actions).argsort(1) 488 self.node_to_index = dict() 489 self.index_to_node = dict() 490 for i, node in enumerate(self.G.nodes): 491 self.node_to_index[node] = i 492 self.index_to_node[i] = node 493 494 # Compute the starting state distribution 495 self.starting_state_distribution = np.zeros(self.n_states) 496 self.starting_states = [] 497 for n, p in self._starting_node_sampler.next_nodes_and_probs: 498 s = self.node_to_index[n] 499 self.starting_state_distribution[s] = p 500 self.starting_states.append(s) 501 self.starting_states_and_probs = list( 502 zip(self.starting_states, self._starting_node_sampler.probs) 503 ) 504 505 def _get_action_mapping(self, node: "NODE_TYPE") -> Tuple["ACTION_TYPE", ...]: 506 """ 507 Returns 508 ------- 509 Tuple["ACTION_TYPE", ...] 510 The random action mapping for the state given in input. 511 """ 512 if node not in self._action_mapping: 513 self._action_mapping[node] = ( 514 self._rng.rand(self.n_actions).argsort().tolist() 515 if self._randomize_actions 516 else list(range(self.n_actions)) 517 ) 518 return self._action_mapping[node] 519 520 def _inverse_action_mapping( 521 self, node: "NODE_TYPE", action: "ACTION_TYPE" 522 ) -> "ACTION_TYPE": 523 """ 524 Returns 525 ------- 526 ACTION_TYPE 527 The effective action corresponding to the action given in input in the given state. In other words, it 528 reverses the random action mapping. 529 """ 530 return self._get_action_mapping(node)[action] 531 532 def _produce_random_seed(self) -> int: 533 """ 534 Returns 535 ------- 536 int 537 A new random seed that can be used for internal objects. 538 """ 539 return self._fast_rng.randint(0, 10_000) 540 541 def _instantiate_mdp(self): 542 """ 543 recursively instantiate the MDP. 544 """ 545 for sn in self.starting_nodes: 546 instantiate_transitions(self, sn) 547 548 @property 549 def T(self) -> np.ndarray: 550 """ 551 is an alias for the transition matrix. 552 """ 553 return self.transition_matrix_and_rewards[0] 554 555 @property 556 def R(self) -> np.ndarray: 557 """ 558 is an alias for the rewards matrix. 559 """ 560 return self.transition_matrix_and_rewards[1] 561 562 @property 563 def recurrent_nodes_set(self) -> Iterable["NODE_TYPE"]: 564 """ 565 Returns 566 ------- 567 Iterable["NODE_TYPE"] 568 The recurrent states set. 569 """ 570 if self._recurrent_nodes_set is None: 571 self._recurrent_nodes_set = get_recurrent_nodes_set( 572 self.communication_class, self.G 573 ) 574 return self._recurrent_nodes_set 575 576 @property 577 def communication_class( 578 self: Union["EpisodicMDP", "ContinuousMDP"] 579 ) -> MDPCommunicationClass: 580 """ 581 Returns 582 ------- 583 MDPCommunicationClass 584 The communication class. 585 """ 586 if self._communication_class is None: 587 self._communication_class = get_communication_class( 588 self.T, self.get_episodic_graph(True) if self.is_episodic() else self.G 589 ) 590 return self._communication_class 591 592 def get_optimal_policy(self, stochastic_form: bool) -> np.ndarray: 593 """ 594 Returns 595 ------- 596 np.ndarray 597 The optimal policy. It can be either in the stochastic form, so in the form of dirac probability 598 distribution or as a simple mapping to integer. 599 """ 600 if stochastic_form not in self._optimal_policy: 601 self._optimal_policy[stochastic_form] = get_policy_from_q_values( 602 self.optimal_value_functions[0], stochastic_form 603 ) 604 return self._optimal_policy[stochastic_form] 605 606 def get_worst_policy(self, stochastic_form) -> np.ndarray: 607 """ 608 Returns 609 ------- 610 np.ndarray 611 The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability 612 distribution or as a simple mapping to integer. 613 """ 614 if stochastic_form not in self._worst_policy: 615 self._worst_policy[stochastic_form] = get_policy_from_q_values( 616 self._vi(self.T, -self.R)[0], stochastic_form 617 ) 618 return self._worst_policy[stochastic_form] 619 620 def get_value_functions(self, policy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: 621 """ 622 Parameters 623 ---------- 624 policy : np.ndarray 625 a numpy array containing a continuous policy. 626 Returns 627 ------- 628 Q : np.ndarray 629 the state action value function. 630 V : np.ndarray 631 the state value function. 632 """ 633 return self._pe(*self.transition_matrix_and_rewards, policy) 634 635 @property 636 def optimal_value_functions(self) -> Tuple[np.ndarray, np.ndarray]: 637 """ 638 Returns 639 ------- 640 Q : np.ndarray 641 the optimal state action value function. 642 V : np.ndarray 643 the optimal state value function. 644 """ 645 if self._optimal_value is None: 646 self._optimal_value = self._vi(*self.transition_matrix_and_rewards) 647 return self._optimal_value 648 649 @property 650 def worst_value_functions(self) -> Tuple[np.ndarray, np.ndarray]: 651 """ 652 Returns 653 ------- 654 Q : np.ndarray 655 the state action value function of the worst policy. 656 V : np.ndarray 657 the state value function of the worst policy. 658 """ 659 if self._worst_value is None: 660 self._worst_value = self._pe( 661 *self.transition_matrix_and_rewards, self.get_worst_policy(True) 662 ) 663 return self._worst_value 664 665 @property 666 def random_value_functions(self) -> Tuple[np.ndarray, np.ndarray]: 667 """ 668 Returns 669 ------- 670 Q : np.ndarray 671 the state action value function of the random uniform policy. 672 V : np.ndarray 673 the state value function of the random uniform policy. 674 """ 675 if self._random_value is None: 676 self._random_value = self._pe( 677 *self.transition_matrix_and_rewards, self.random_policy 678 ) 679 return self._random_value 680 681 @property 682 def optimal_transition_probabilities(self) -> np.ndarray: 683 """ 684 Returns 685 ------- 686 np.ndarray 687 The transition probabilities corresponding to the optimal policy. 688 """ 689 if self._otp is None: 690 T = self.T_cf if self.is_episodic() else self.T 691 pi = ( 692 self.get_optimal_policy_continuous_form(True) 693 if self.is_episodic() 694 else self.get_optimal_policy(True) 695 ) 696 self._otp = get_transition_probabilities(T, pi) 697 return self._otp 698 699 @property 700 def worst_transition_probabilities(self) -> np.ndarray: 701 """ 702 Returns 703 ------- 704 np.ndarray 705 The transition probabilities corresponding to the worst policy. 706 """ 707 if self._wtp is None: 708 T = self.T_cf if self.is_episodic() else self.T 709 pi = ( 710 self.get_worst_policy_continuous_form(True) 711 if self.is_episodic() 712 else self.get_worst_policy(True) 713 ) 714 self._wtp = get_transition_probabilities(T, pi) 715 return self._wtp 716 717 @property 718 def random_transition_probabilities(self) -> np.ndarray: 719 """ 720 Returns 721 ------- 722 np.ndarray 723 The transition probabilities corresponding to the random uniform policy. 724 """ 725 if self._rtp is None: 726 T = self.T_cf if self.is_episodic() else self.T 727 pi = self.random_policy_cf if self.is_episodic() else self.random_policy 728 self._rtp = get_transition_probabilities(T, pi) 729 return self._rtp 730 731 @property 732 def optimal_markov_chain(self) -> MarkovChain: 733 """ 734 Returns 735 ------- 736 MarkovChain 737 The Markov chain corresponding to the optimal policy. 738 """ 739 if self._omc is None: 740 self._omc = get_markov_chain(self.optimal_transition_probabilities) 741 return self._omc 742 743 @property 744 def worst_markov_chain(self) -> MarkovChain: 745 """ 746 Returns 747 ------- 748 MarkovChain 749 The Markov chain corresponding to the worst policy. 750 """ 751 if self._wmc is None: 752 self._wmc = get_markov_chain(self.worst_transition_probabilities) 753 return self._wmc 754 755 @property 756 def random_markov_chain(self) -> MarkovChain: 757 """ 758 Returns 759 ------- 760 MarkovChain 761 The Markov chain corresponding to the random uniform policy. 762 """ 763 if self._rmc is None: 764 self._rmc = get_markov_chain(self.random_transition_probabilities) 765 return self._rmc 766 767 def get_stationary_distribution(self, policy: np.ndarray) -> np.ndarray: 768 """ 769 Parameters 770 ---------- 771 policy : np.ndarray 772 a numpy array containing a continuous policy. 773 Returns 774 ------- 775 np.ndarray 776 the stationary distribution of the Markov chain yielded by the policy. 777 """ 778 return get_stationary_distribution( 779 get_transition_probabilities(self.T, policy), 780 self.starting_states_and_probs, 781 ) 782 783 @property 784 def optimal_stationary_distribution(self) -> np.array: 785 """ 786 Returns 787 ------- 788 np.ndarray 789 the stationary distribution of the Markov chain yielded by the optimal policy. 790 """ 791 if self._osd is None: 792 self._osd = get_stationary_distribution( 793 self.optimal_transition_probabilities, 794 self.starting_states_and_probs, 795 ) 796 return self._osd 797 798 @property 799 def worst_stationary_distribution(self) -> np.array: 800 """ 801 Returns 802 ------- 803 np.ndarray 804 the stationary distribution of the Markov chain yielded by the worst policy. 805 """ 806 if self._wsd is None: 807 self._wsd = get_stationary_distribution( 808 self.worst_transition_probabilities, 809 self.starting_states_and_probs, 810 ) 811 assert not np.isnan(self._wsd).any() 812 return self._wsd 813 814 @property 815 def random_stationary_distribution(self) -> np.array: 816 """ 817 Returns 818 ------- 819 np.ndarray 820 the stationary distribution of the Markov chain yielded by the random uniform policy. 821 """ 822 if self._rsd is None: 823 self._rsd = get_stationary_distribution( 824 self.random_transition_probabilities, 825 None, 826 ) 827 return self._rsd 828 829 @property 830 def optimal_average_rewards(self) -> np.ndarray: 831 """ 832 Returns 833 ------- 834 np.ndarray 835 The average rewards obtained for each state when following the optimal policy. 836 """ 837 if self._oars is None: 838 R = self.R_cf if self.is_episodic() else self.R 839 pi = ( 840 self.get_optimal_policy_continuous_form(True) 841 if self.is_episodic() 842 else self.get_optimal_policy(True) 843 ) 844 self._oars = get_average_rewards(R, pi) 845 return self._oars 846 847 @property 848 def worst_average_rewards(self) -> np.ndarray: 849 """ 850 Returns 851 ------- 852 np.ndarray 853 The average rewards obtained for each state when following the worst policy. 854 """ 855 if self._wars is None: 856 R = self.R_cf if self.is_episodic() else self.R 857 pi = ( 858 self.get_worst_policy_continuous_form(True) 859 if self.is_episodic() 860 else self.get_worst_policy(True) 861 ) 862 self._wars = get_average_rewards(R, pi) 863 return self._wars 864 865 @property 866 def random_average_rewards(self) -> np.ndarray: 867 """ 868 Returns 869 ------- 870 np.ndarray 871 The average rewards obtained for each state when following the random uniform policy. 872 """ 873 if self._rars is None: 874 R = self.R_cf if self.is_episodic() else self.R 875 pi = self.random_policy_cf if self.is_episodic() else self.random_policy 876 self._rars = get_average_rewards(R, pi) 877 return self._rars 878 879 def get_average_reward(self, policy: np.ndarray) -> float: 880 r""" 881 Parameters 882 ---------- 883 policy : np.ndarray 884 a numpy array containing a continuous policy. 885 Returns 886 ------- 887 average_reward : float 888 the expected time average reward, i.e. 889 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 890 the policy. 891 """ 892 sd = self.get_stationary_distribution(policy) 893 return sum(sd * get_average_rewards(self.R, policy)) 894 895 @property 896 def optimal_average_reward(self) -> float: 897 r""" 898 Returns 899 ------- 900 average_reward : float 901 the expected time average reward, i.e. 902 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 903 the optimal policy. 904 """ 905 if self._oar is None: 906 self._oar = sum( 907 self.optimal_stationary_distribution * self.optimal_average_rewards 908 ) 909 return self._oar 910 911 @property 912 def worst_average_reward(self) -> float: 913 r""" 914 Returns 915 ------- 916 average_reward : float 917 the expected time average reward, i.e. 918 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 919 the worst policy. 920 """ 921 if self._war is None: 922 self._war = sum( 923 self.worst_stationary_distribution * self.worst_average_rewards 924 ) 925 return self._war 926 927 @property 928 def random_average_reward(self) -> float: 929 r""" 930 Returns 931 ------- 932 average_reward : float 933 the expected time average reward, i.e. 934 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 935 the random uniform policy. 936 """ 937 if self._rar is None: 938 self._rar = sum( 939 self.random_stationary_distribution * self.random_average_rewards 940 ) 941 return self._rar 942 943 @property 944 def transition_matrix_and_rewards(self) -> Tuple[np.ndarray, np.ndarray]: 945 """ 946 Returns 947 ------- 948 np.ndarray 949 The transition probabilities 3d numpy array. 950 np.ndarray 951 The reward matrix. 952 """ 953 if self._transition_matrix_and_rewards is None: 954 self._transition_matrix_and_rewards = get_transition_matrix_and_rewards( 955 self.n_states, 956 self.n_actions, 957 self.G, 958 self.get_info_class, 959 self.get_reward_distribution, 960 self.node_to_index, 961 self._force_sparse_transition, 962 ) 963 return self._transition_matrix_and_rewards 964 965 @property 966 def graph_layout(self) -> Dict["NODE_TYPE", Tuple[float, float]]: 967 """ 968 Returns 969 ------- 970 Dict["NODE_TYPE", Tuple[float, float]] 971 The graph layout for the MDP. It can be customized by implementing the `custom_graph_layout` function. 972 """ 973 if self._graph_layout is None: 974 self._graph_layout = ( 975 self.custom_graph_layout() 976 if hasattr(self, "custom_graph_layout") 977 else nx.nx_agraph.graphviz_layout(self.G) 978 ) 979 return self._graph_layout 980 981 @property 982 def graph_metrics(self) -> Dict[str, Any]: 983 """ 984 Returns 985 ------- 986 Dict[str, Any] 987 The dictionary containing graph metrics for the graph underlying the MDP. 988 """ 989 if self._graph_metrics is None: 990 G = self.get_episodic_graph(True) if self.is_episodic() else self.G 991 self._graph_metrics = dict() 992 self._graph_metrics["# nodes"] = len(G.nodes) 993 self._graph_metrics["# edges"] = len(G.edges) 994 return self._graph_metrics 995 996 @property 997 def diameter(self: Union["EpisodicMDP", "ContinuousMDP"]) -> float: 998 """ 999 Returns 1000 ------- 1001 float 1002 The diameter of the MDP. 1003 """ 1004 if self._diameter is None: 1005 if self.hardness_report: 1006 self._diameter = self.hardness_report["MDP measure of hardness"][ 1007 "diameter" 1008 ] 1009 else: 1010 self._diameter = get_diameter( 1011 self.episodic_transition_matrix_and_rewards[0] 1012 if self.is_episodic() 1013 else self.T, 1014 self.is_episodic(), 1015 ) 1016 return self._diameter 1017 1018 @property 1019 def sum_reciprocals_suboptimality_gaps( 1020 self: Union["EpisodicMDP", "ContinuousMDP"] 1021 ) -> float: 1022 """ 1023 Returns 1024 ------- 1025 float 1026 The sum of the reciprocals of the sub-optimality gaps 1027 """ 1028 if self._sum_reciprocals_suboptimality_gaps is None: 1029 if self.hardness_report: 1030 self._sum_reciprocals_suboptimality_gaps = self.hardness_report[ 1031 "MDP measure of hardness" 1032 ]["suboptimal_gaps"] 1033 else: 1034 self._sum_reciprocals_suboptimality_gaps = ( 1035 get_sum_reciprocals_suboptimality_gaps( 1036 *self.optimal_value_functions, 1037 self.reachable_states if self.is_episodic() else None, 1038 ) 1039 ) 1040 return self._sum_reciprocals_suboptimality_gaps 1041 1042 def _compute_value_norm(self, discounted: bool) -> float: 1043 """ 1044 Returns 1045 ------- 1046 float 1047 The environmental value norm in the undiscounted or undiscounted setting. 1048 """ 1049 T, R = (self.T_cf, self.R_cf) if self.is_episodic() else (self.T, self.R) 1050 V = ( 1051 self.optimal_value_continuous_form[1] 1052 if self.is_episodic() 1053 else self.optimal_value_functions[1] 1054 ) 1055 if discounted: 1056 return calculate_norm_discounted(T, V) 1057 return calculate_norm_average( 1058 T, self.optimal_transition_probabilities, self.optimal_average_rewards 1059 ) 1060 1061 @property 1062 def discounted_value_norm(self) -> float: 1063 """ 1064 Returns 1065 ------- 1066 float 1067 The environmental value norm in the discounted setting. 1068 """ 1069 if True not in self._optimal_value_norm: 1070 if ( 1071 self._are_all_transition_deterministic 1072 and self._are_all_rewards_deterministic 1073 ): 1074 self._optimal_value_norm[True] = 0.0 1075 elif self.hardness_report: 1076 self._optimal_value_norm[True] = self.hardness_report[ 1077 "MDP measure of hardness" 1078 ]["value_norm"] 1079 else: 1080 self._optimal_value_norm[True] = self._compute_value_norm(True) 1081 return self._optimal_value_norm[True] 1082 1083 @property 1084 def undiscounted_value_norm(self) -> float: 1085 """ 1086 Returns 1087 ------- 1088 float 1089 The environmental value norm in the undiscounted setting. 1090 """ 1091 if False not in self._optimal_value_norm: 1092 self._optimal_value_norm[False] = self._compute_value_norm(False) 1093 return self._optimal_value_norm[False] 1094 1095 @property 1096 def value_norm(self): 1097 """ 1098 is an alias for the discounted value norm. 1099 """ 1100 return self.discounted_value_norm 1101 1102 @property 1103 def measures_of_hardness(self) -> Dict[str, float]: 1104 """ 1105 Returns 1106 ------- 1107 Dict[str, float] 1108 The dictionary containing all the measures of hardness availble. 1109 """ 1110 return dict( 1111 diameter=self.diameter, 1112 suboptimal_gaps=self.sum_reciprocals_suboptimality_gaps, 1113 value_norm=self.value_norm, 1114 ) 1115 1116 @property 1117 def summary(self) -> Dict[str, Dict[str, Any]]: 1118 """ 1119 Returns 1120 ------- 1121 Dict[str, Dict[str, Any]] 1122 The dictionary with information about the parameters, the measures of hardness and the graph metrics. 1123 """ 1124 if self._summary is None: 1125 self._summary = { 1126 "Parameters": clean_for_storing(self.parameters), 1127 "Measure of hardness": clean_for_storing(self.measures_of_hardness), 1128 "Graph metrics": clean_for_storing(self.graph_metrics), 1129 } 1130 return self._summary 1131 1132 @property 1133 def hardness_report(self) -> Union[Dict, None]: 1134 """ 1135 looks for a cached hardness report in the folder given in input when the MDP was created. It returns None if it 1136 is not able to find it. 1137 """ 1138 if self._hr is None: 1139 report_file = find_hardness_report_file(self, self._hardness_reports_folder) 1140 if report_file: 1141 with open(report_file, "r") as f: 1142 report = yaml.load(f, yaml.Loader) 1143 self._hr = report 1144 else: 1145 self._hr = False 1146 if self._hr: 1147 return self._hr 1148 return None 1149 1150 def get_info_class(self, n: "NODE_TYPE") -> NodeInfoClass: 1151 """ 1152 Returns 1153 ------- 1154 NodeInfoClass 1155 The container class (NodeInfoClass) associated with state n. 1156 """ 1157 return self.G.nodes[n]["info_class"] 1158 1159 def get_transition_distributions( 1160 self, node: "NODE_TYPE" 1161 ) -> Dict[int, Union[rv_continuous, NextStateSampler]]: 1162 """ 1163 Returns 1164 ------- 1165 Dict[int, Union[rv_continuous, NextStateSampler]] 1166 The dictionary containing the transition distributions for any action at the state given in input. 1167 """ 1168 return self.get_info_class(node).transition_distributions 1169 1170 def get_reward_distribution( 1171 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 1172 ) -> rv_continuous: 1173 """ 1174 Returns 1175 ------- 1176 rv_continuous 1177 The reward distribution for transitioning in next_node when selecting action from state. 1178 """ 1179 if (node, action, next_node) not in self._cached_reward_distributions: 1180 self._cached_reward_distributions[ 1181 node, action, next_node 1182 ] = self._get_reward_distribution( 1183 node, self._inverse_action_mapping(node, action), next_node 1184 ) 1185 return self._cached_reward_distributions[node, action, next_node] 1186 1187 def sample_reward( 1188 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 1189 ) -> float: 1190 """ 1191 Returns 1192 ------- 1193 float 1194 A sample from the reward distribution when transitioning in next_node when selecting action from state. 1195 """ 1196 if (node, action, next_node) not in self._cached_rewards or len( 1197 self._cached_rewards[node, action, next_node] 1198 ) == 0: 1199 self._cached_rewards[node, action, next_node] = ( 1200 self.get_reward_distribution(node, action, next_node) 1201 .rvs(5000, random_state=self._rng) 1202 .tolist() 1203 ) 1204 r = self._cached_rewards[node, action, next_node].pop(0) 1205 return ( 1206 r * (self.rewards_range[1] - self.rewards_range[0]) - self.rewards_range[0] 1207 ) 1208 1209 def get_measure_from_name(self, measure_name: str) -> float: 1210 """ 1211 Parameters 1212 ---------- 1213 measure_name : str 1214 the code name for the measure. The available code names can be obtained using the 1215 ``BaseMDP.get_available_hardness_measures`` function. 1216 Returns 1217 ------- 1218 float 1219 The value for the hardness measure. 1220 """ 1221 if measure_name == "diameter": 1222 return self.diameter 1223 elif measure_name in ["value_norm", "environmental_value_norm"]: 1224 return self.value_norm 1225 elif measure_name == "suboptimal_gaps": 1226 return self.sum_reciprocals_suboptimality_gaps 1227 else: 1228 raise ValueError( 1229 f"{measure_name} is not a valid hardness measure name: available ones are " 1230 + str(self.get_available_hardness_measures()) 1231 ) 1232 1233 def action_spec(self) -> DiscreteArray: 1234 """ 1235 Returns 1236 ------- 1237 DiscreteArray 1238 The action spec of the environment. 1239 """ 1240 return DiscreteArray(self.n_actions, name="action") 1241 1242 def observation_spec(self) -> Array: 1243 """ 1244 Returns 1245 ------- 1246 Array 1247 The observation spec of the environment. 1248 """ 1249 if self.emission_map is None: 1250 return DiscreteArray(self.n_states, name="observation") 1251 obs = self.get_observation(self.starting_nodes[0], 0) 1252 return BoundedArray(obs.shape, obs.dtype, -np.inf, np.inf, "observation") 1253 1254 def get_observation( 1255 self, node: "NODE_TYPE", h: int = None 1256 ) -> Union[int, np.ndarray]: 1257 """ 1258 Returns 1259 ------- 1260 Union[int, np.ndarray] 1261 The representation corresponding to the state given in input. Episodic MDPs also requires the current 1262 in-episode time step. 1263 """ 1264 if self.emission_map is None: 1265 return self.node_to_index[self.cur_node] 1266 return self.emission_map.get_observation(node, h) 1267 1268 def reset(self) -> dm_env.TimeStep: 1269 """ 1270 resets the environment to a newly sampled starting state. 1271 """ 1272 self.necessary_reset = False 1273 self.h = 0 1274 self.cur_node = self.last_starting_node = self._starting_node_sampler.sample() 1275 node_info_class = self.get_info_class(self.cur_node) 1276 node_info_class.update_visitation_counts() 1277 return dm_env.restart(self.get_observation(self.cur_node, self.h)) 1278 1279 def step(self, action: int, auto_reset=False) -> dm_env.TimeStep: 1280 """ 1281 takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the 1282 at the end of the episodes. 1283 Returns 1284 ------- 1285 dm_env.TimeStep 1286 The TimeStep object containing the transition information. 1287 """ 1288 if auto_reset and self.necessary_reset: 1289 return self.reset() 1290 assert not self.necessary_reset 1291 1292 # This can be the in episode time step (episodic setting) or the total numuber of time steps (continuous setting) 1293 self.h += 1 1294 1295 # In case the action is a numpy array 1296 action = int(action) 1297 1298 # Moving the current state according to the action played 1299 old_node = self.cur_node 1300 self.cur_node = self.get_info_class(old_node).sample_next_state(action) 1301 self.last_edge = old_node, self.cur_node 1302 node_info_class = self.get_info_class(self.cur_node) 1303 node_info_class.update_visitation_counts(action) 1304 1305 # Calculate reward and observation 1306 reward = self.sample_reward(old_node, action, self.cur_node) 1307 observation = self.get_observation(self.cur_node, self.h) 1308 1309 # Wrapping the time step in a dm_env.TimeStep 1310 if self.is_episodic() and self.h >= self.H: 1311 self.necessary_reset = True 1312 if self.emission_map is None: 1313 observation = -1 1314 else: 1315 observation = np.zeros_like(self.observation_spec().generate_value()) 1316 return dm_env.termination(reward=reward, observation=observation) 1317 return dm_env.transition(reward=reward, observation=observation) 1318 1319 def random_steps( 1320 self, n: int, auto_reset=False 1321 ) -> List[Tuple[dm_env.TimeStep, int]]: 1322 """ 1323 takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to 1324 True than it automatically resets episodic MDPs. 1325 1326 Returns 1327 ------- 1328 dm_env.TimeStep 1329 The TimeStep object containing the transition information. 1330 int 1331 The action performed. 1332 """ 1333 1334 data = [] 1335 for _ in range(n): 1336 action = int(self._rng.randint(self.action_spec().num_values)) 1337 ts = self.step(action, auto_reset) 1338 data.append((ts, action)) 1339 return data 1340 1341 def random_step(self, auto_reset=False) -> Tuple[dm_env.TimeStep, int]: 1342 """ 1343 takes a step with a random action and returns both the next step and the random action. If auto_reset is set to 1344 True than it automatically resets episodic MDPs. 1345 1346 Returns 1347 ------- 1348 dm_env.TimeStep 1349 The TimeStep object containing the transition information. 1350 int 1351 The action performed. 1352 """ 1353 action = int(self._rng.randint(self.action_spec().num_values)) 1354 ts = self.step(action, auto_reset) 1355 return ts, action 1356 1357 def get_visitation_counts( 1358 self, state_only=True 1359 ) -> Dict[Union["NODE_TYPE", Tuple["NODE_TYPE", "ACTION_TYPE"]], int]: 1360 """ 1361 when state_only is True it returns the visitation counts for the states and when it is False it returns the 1362 visitation counts for state action pairs. 1363 """ 1364 if state_only: 1365 return { 1366 node: self.get_info_class(node).state_visitation_count 1367 for node in self.G.nodes 1368 } 1369 return { 1370 (node, a): self.get_info_class(node).actions_visitation_count[a] 1371 for node in self.G.nodes 1372 for a in range(self.n_actions) 1373 } 1374 1375 def reset_visitation_counts(self): 1376 """ 1377 resets the visitation counts to zero for all states and state action pairs. 1378 """ 1379 for node in self.G.nodes: 1380 self.get_info_class(node).state_visitation_count = 0 1381 for a in range(self.n_actions): 1382 self.get_info_class(node).actions_visitation_count[a] = 0 1383 1384 def get_value_node_labels( 1385 self: Union["ContinuousMDP", "EpisodicMDP"], V: np.ndarray = None 1386 ) -> Dict["NODE_TYPE", float]: 1387 """ 1388 returns a mapping from state to state values. By default, it uses the optimal values. 1389 """ 1390 if V is None: 1391 _, V = self.optimal_value_functions 1392 else: 1393 if isinstance(self, EpisodicMDP): 1394 h, d = V.shape 1395 assert h == self.H and d == self.n_states 1396 else: 1397 assert len(V) == self.n_states 1398 return { 1399 node: np.round( 1400 V[0, self.node_to_index[node]] 1401 if self.is_episodic() 1402 else V[self.node_to_index[node]], 1403 2, 1404 ) 1405 for node in self.G.nodes 1406 }
46class BaseMDP(dm_env.Environment, abc.ABC): 47 """ 48 The base class for MDPs. 49 """ 50 51 @staticmethod 52 @abc.abstractmethod 53 def get_unique_symbols() -> List[str]: 54 """ 55 Returns 56 ------- 57 List[str] 58 the unique symbols of the grid representation of the MDP. 59 """ 60 61 @staticmethod 62 def get_available_hardness_measures() -> List[str]: 63 """ 64 Returns 65 ------- 66 List[str] 67 The hardness measures available in the package. 68 """ 69 return ["diameter", "value_norm", "suboptimal_gaps"] 70 71 @staticmethod 72 def produce_gin_file_from_mdp_parameters( 73 parameters: Dict[str, Any], mdp_class_name: str, index: int = 0 74 ) -> str: 75 """ 76 Parameters 77 ---------- 78 parameters : Dict[str, Any] 79 The parameters of the MDP. 80 mdp_class_name : str 81 The name of the class of the MDP. 82 index : int 83 The index for the gin configuration 84 85 Returns 86 ------- 87 str 88 The gin configuration for the given parameters and index. 89 """ 90 91 string = "" 92 for k, v in parameters.items(): 93 string += f"prms_{index}/{mdp_class_name}.{k} = {v}\n" 94 return string 95 96 @staticmethod 97 @abc.abstractmethod 98 def does_seed_change_MDP_structure() -> bool: 99 """ 100 Returns 101 ------- 102 bool 103 True if when changing the seed the transition matrix and/or rewards matrix change. This for example may 104 happen when there are fewer starting states that possible one and the effective starting states are picked 105 randomly based on the seed. 106 """ 107 108 @staticmethod 109 @abc.abstractmethod 110 def is_episodic() -> bool: 111 """ 112 Returns 113 ------- 114 bool 115 True if the MDP is episodic. 116 """ 117 118 @staticmethod 119 @abc.abstractmethod 120 def sample_parameters(n: int, seed: int = None) -> List[Dict[str, Any]]: 121 """ 122 Returns 123 ------- 124 List[Dict[str, Any]] 125 n sampled parameters that can be used to construct an MDP in a reasonable amount of time. 126 """ 127 128 @staticmethod 129 @abc.abstractmethod 130 def sample_mdp_parameters( 131 n: int, is_episodic: bool, seed: int = None 132 ) -> List[Dict[str, Any]]: 133 """ 134 Returns 135 ------- 136 List[Dict[str, Any]] 137 n sampled parameters that can be used to construct an MDP in a reasonable amount of time. 138 """ 139 140 @staticmethod 141 @abc.abstractmethod 142 def get_node_class() -> Type["NODE_TYPE"]: 143 """ 144 Returns 145 ------- 146 Type["NODE_TYPE"] 147 The class of the nodes of the MDP. 148 """ 149 150 @property 151 @abc.abstractmethod 152 def n_actions(self) -> int: 153 """ 154 Returns 155 ------- 156 int 157 The number of available actions. 158 """ 159 160 @abc.abstractmethod 161 def _get_next_nodes_parameters( 162 self, node: "NODE_TYPE", action: "ACTION_TYPE" 163 ) -> Tuple[Tuple[dict, float], ...]: 164 """ 165 Returns 166 ------- 167 Tuple[Tuple[dict, float], ...] 168 The parameters of the possible next nodes reachable from the given state and action. 169 """ 170 171 @abc.abstractmethod 172 def _get_reward_distribution( 173 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 174 ) -> rv_continuous: 175 """ 176 Returns 177 ------- 178 rv_continuous 179 The distribution over rewards in the zero one interval when transitioning from a given state and action 180 to a given next state. Note that rescaling the rewards to a different range is handled separately. 181 """ 182 183 @abc.abstractmethod 184 def _get_starting_node_sampler(self) -> NextStateSampler: 185 """ 186 Returns 187 ------- 188 NextStateSampler 189 The sampler over next states that corresponds to the starting state distribution. 190 """ 191 192 @abc.abstractmethod 193 def _check_parameters_in_input(self): 194 """ 195 checks whether the parameters given in input can produce a correct MDP instance. 196 """ 197 assert self._p_rand is None or (0 < self._p_rand < 0.9999) 198 assert self._p_lazy is None or (0 < self._p_lazy < 0.9999) 199 200 @abc.abstractmethod 201 def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.ndarray: 202 """ 203 Returns 204 ------- 205 np.ndarray 206 An ASCII representation of the state given in input stored as numpy array. 207 """ 208 209 @abc.abstractmethod 210 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 211 """ 212 Returns 213 ------- 214 np.ndarray 215 An ASCII representation of the state given in input stored as numpy array. 216 """ 217 218 @property 219 @abc.abstractmethod 220 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 221 """ 222 Returns 223 ------- 224 List["NODE_TYPE"] 225 The list containing all the possible starting state for this MDP instance. 226 """ 227 228 @property 229 @abc.abstractmethod 230 def parameters(self) -> Dict[str, Any]: 231 """ 232 Returns 233 ------- 234 Dict[str, Any] 235 The parameters of the MDP. 236 """ 237 return dict( 238 seed=self._seed, 239 randomize_actions=self._randomize_actions, 240 p_lazy=self._p_lazy, 241 p_rand=self._p_rand, 242 rewards_range=self._rewards_range, 243 make_reward_stochastic=self._make_reward_stochastic, 244 reward_variance_multiplier=self._reward_variance_multiplier, 245 ) 246 247 @abc.abstractmethod 248 def get_gin_parameters(self, index: int) -> str: 249 """ 250 Returns 251 ------- 252 str 253 The gin config of the MDP instance. 254 """ 255 256 def get_gin_config(self, index: int) -> str: 257 """ 258 Returns 259 ------- 260 str 261 The gin config file of the MDP. 262 """ 263 return "".join(self.get_gin_parameters(index)) 264 265 def get_node_labels(self, l: List[Any]) -> Dict["NODE_TYPE", Any]: 266 """ 267 Returns 268 ------- 269 Dict["NODE_TYPE", Any] 270 A dictionary that assigns to each node the value of the list in the corresponding index. 271 """ 272 assert len(l) == self.n_states, ( 273 f"The array in input should have the same dimensionality of the number of states {self.n_states}, " 274 f" {len(l)} long array received instead." 275 ) 276 return {self.index_to_node[i]: l[i] for i in range(len(l))} 277 278 def get_node_action_labels(self, l: List[List[Any]]) -> Dict["NODE_TYPE", float]: 279 """ 280 Returns 281 ------- 282 Dict["NODE_TYPE", Any] 283 A dictionary that assigns to each node and action the value of the list in the corresponding index. 284 """ 285 assert len(l) == self.n_states, ( 286 f"The list in input should be the same length of the number of states {self.n_states}, " 287 f" list of length {len(l)} received instead." 288 ) 289 assert all( 290 len(ll) == self.n_actions for ll in l 291 ), f"The lists inside the lists should be the same length as the number of actions {self.n_actions}, " 292 return { 293 (self.index_to_node[i], action): l[i][action] 294 for i in range(len(l)) 295 for action in range(self.n_actions) 296 } 297 298 @property 299 def hash(self) -> str: 300 """ 301 Returns 302 ------- 303 str 304 The hash value based on the parameters of the MDP. This can be used to create cache files. 305 """ 306 s = "_".join(map(str, clean_for_storing(list(self.parameters.values())))) 307 return f"mdp_{type(self).__name__}_" + clean_for_file_path(s) 308 309 def __str__(self) -> str: 310 """ 311 Returns 312 ------- 313 str 314 The string containing all the information about the MDP including parameters, measures of hardness and 315 graph metrics. 316 """ 317 string = type(self).__name__ + "\n" 318 m_l = 0 319 for k, v in self.summary.items(): 320 m_l = max(m_l, len(max(v.keys(), key=len)) + 4) 321 for k, v in self.summary.items(): 322 string += "\t" + k + "\n" 323 for kk, vv in v.items(): 324 string += f"\t\t{kk}{' ' * (m_l - len(kk))}:\t{vv}\n" 325 return string 326 327 @abc.abstractmethod 328 def __init__( 329 self: Union["EpisodicMDP", "ContinuousMDP"], 330 seed: int, 331 randomize_actions: bool = True, 332 make_reward_stochastic=False, 333 reward_variance_multiplier: float = 1.0, 334 p_lazy: float = None, 335 p_rand: float = None, 336 rewards_range: Tuple[float, float] = (0.0, 1.0), 337 emission_map: Type[EmissionMap] = Tabular, 338 emission_map_kwargs: Dict[str, Any] = dict(), 339 noise: Type[EmissionMap] = None, 340 noise_kwargs: Dict[str, Any] = dict(), 341 instantiate_mdp: bool = True, 342 force_sparse_transition: bool = False, 343 exclude_horizon_from_parameters: bool = False, 344 ): 345 """ 346 Parameters 347 ---------- 348 seed : int 349 The random seed. 350 randomize_actions : bool 351 If True, the outcome of action at different states is randomized. By default, it is set to True. 352 make_reward_stochastic : bool 353 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 354 reward_variance_multiplier : float 355 A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the 356 variance. By default, it is set to 1. 357 p_lazy : float 358 The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero. 359 p_rand : float 360 The probability of selecting an action at random instead of the one specified by the agent. By default, it 361 is set to zero. 362 rewards_range : Tuple[float, float] 363 The maximum range of values for the rewards. By default, it is `(0, 1)`. 364 emission_map : Type[EmissionMap] 365 The emission map that renders the MPD non-tabular. By default, no emission map is used. 366 emission_map_kwargs : Dict[str, Any] 367 The keyword arguments for the emission map. 368 noise : Type[EmissionMap] 369 The noise class to make the emission map stochastic. By default, the emission map is left deterministic. 370 noise_kwargs : Dict[str, Any] 371 The keyword arguments for the noise. 372 instantiate_mdp : bool 373 If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True. 374 force_sparse_transition : bool 375 If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not 376 enforced. 377 exclude_horizon_from_parameters : bool 378 If True, in the episodic case, the horizon is not considered a parameter when computing the `parameters` 379 properties and the hash. By default, it is not excluded. 380 """ 381 382 # MDP generic variables 383 self._seed = seed 384 self._randomize_actions = randomize_actions 385 self._make_reward_stochastic = make_reward_stochastic 386 self._reward_variance_multiplier = reward_variance_multiplier 387 self._hardness_reports_folder = config.get_hardness_measures_cache_folder() 388 self._force_sparse_transition = force_sparse_transition 389 self._exclude_horizon_from_parameters = exclude_horizon_from_parameters 390 self._p_rand = p_rand if p_rand is None or p_rand > 0.0 else None 391 self._p_lazy = p_lazy if p_lazy is None or p_lazy > 0.0 else None 392 self.rewards_range = self._rewards_range = ( 393 rewards_range 394 if rewards_range[0] < rewards_range[1] 395 else rewards_range[::-1] 396 ) 397 self._are_all_rewards_deterministic = True 398 self._are_all_transition_deterministic = True 399 self.r_min, self.r_max = self.rewards_range 400 401 # MDP loop variables 402 self._hr = None 403 self.cur_node = None 404 self.last_edge = None 405 self.last_starting_node = None 406 self.is_reset_necessary = True 407 self.current_timestep = 0 408 self.h = 0 409 self._rng = np.random.RandomState(seed) 410 self._fast_rng = random.Random(seed) 411 412 # Caching variables. Note that we cannot use lru_cache because it prevents from deleting the objects. 413 self._cached_rewards = dict() 414 self._cached_reward_distributions = dict() 415 self._action_mapping = dict() 416 self._communication_class = None 417 self._recurrent_nodes_set = None 418 if not hasattr(self, "_transition_matrix_and_rewards"): 419 self._transition_matrix_and_rewards = None 420 self._graph_layout = None 421 self._graph_metrics = None 422 self._summary = None 423 self._diameter = None 424 self._sum_reciprocals_suboptimality_gaps = None 425 self._optimal_value_norm = dict() 426 self._optimal_value = None 427 self._worst_value = None 428 self._random_value = None 429 self._optimal_policy = dict() 430 self._worst_policy = dict() 431 self._otp = None 432 self._omc = None 433 self._osd = None 434 self._oars = None 435 self._oar = None 436 self._wtp = None 437 self._wmc = None 438 self._wsd = None 439 self._wars = None 440 self._war = None 441 self._rtp = None 442 self._rmc = None 443 self._rsd = None 444 self._rars = None 445 self._rar = None 446 447 if instantiate_mdp: 448 self.instantiate_MDP() 449 450 # Emission map 451 if emission_map == Tabular or emission_map is None: 452 self.emission_map = None 453 self.is_tabular = True 454 else: 455 noise_kwargs["seed"] = seed 456 self.emission_map = emission_map( 457 self, 458 noise_class=noise, 459 noise_kwargs=noise_kwargs, 460 **emission_map_kwargs, 461 ) 462 self.is_tabular = self.emission_map.is_tabular 463 464 def instantiate_MDP(self): 465 """ 466 recurisively instantiate the MDP. 467 """ 468 # Instantiating the MDP 469 self._check_parameters_in_input() 470 self._starting_node_sampler = self._get_starting_node_sampler() 471 self.starting_nodes = self._starting_node_sampler.next_nodes 472 self.G = nx.DiGraph() 473 self._instantiate_mdp() 474 self.n_states = len(self.G.nodes) 475 476 # Some shortcuts variables 477 if not self.is_episodic(): 478 self._vi, self._pe = ( 479 discounted_value_iteration, 480 discounted_policy_evaluation, 481 ) 482 self.random_policy = ( 483 np.ones((self.n_states, self.n_actions), dtype=np.float32) 484 / self.n_actions 485 ) 486 487 # Cache state to index mapping 488 mapping = self._rng.rand(self.n_states, self.n_actions).argsort(1) 489 self.node_to_index = dict() 490 self.index_to_node = dict() 491 for i, node in enumerate(self.G.nodes): 492 self.node_to_index[node] = i 493 self.index_to_node[i] = node 494 495 # Compute the starting state distribution 496 self.starting_state_distribution = np.zeros(self.n_states) 497 self.starting_states = [] 498 for n, p in self._starting_node_sampler.next_nodes_and_probs: 499 s = self.node_to_index[n] 500 self.starting_state_distribution[s] = p 501 self.starting_states.append(s) 502 self.starting_states_and_probs = list( 503 zip(self.starting_states, self._starting_node_sampler.probs) 504 ) 505 506 def _get_action_mapping(self, node: "NODE_TYPE") -> Tuple["ACTION_TYPE", ...]: 507 """ 508 Returns 509 ------- 510 Tuple["ACTION_TYPE", ...] 511 The random action mapping for the state given in input. 512 """ 513 if node not in self._action_mapping: 514 self._action_mapping[node] = ( 515 self._rng.rand(self.n_actions).argsort().tolist() 516 if self._randomize_actions 517 else list(range(self.n_actions)) 518 ) 519 return self._action_mapping[node] 520 521 def _inverse_action_mapping( 522 self, node: "NODE_TYPE", action: "ACTION_TYPE" 523 ) -> "ACTION_TYPE": 524 """ 525 Returns 526 ------- 527 ACTION_TYPE 528 The effective action corresponding to the action given in input in the given state. In other words, it 529 reverses the random action mapping. 530 """ 531 return self._get_action_mapping(node)[action] 532 533 def _produce_random_seed(self) -> int: 534 """ 535 Returns 536 ------- 537 int 538 A new random seed that can be used for internal objects. 539 """ 540 return self._fast_rng.randint(0, 10_000) 541 542 def _instantiate_mdp(self): 543 """ 544 recursively instantiate the MDP. 545 """ 546 for sn in self.starting_nodes: 547 instantiate_transitions(self, sn) 548 549 @property 550 def T(self) -> np.ndarray: 551 """ 552 is an alias for the transition matrix. 553 """ 554 return self.transition_matrix_and_rewards[0] 555 556 @property 557 def R(self) -> np.ndarray: 558 """ 559 is an alias for the rewards matrix. 560 """ 561 return self.transition_matrix_and_rewards[1] 562 563 @property 564 def recurrent_nodes_set(self) -> Iterable["NODE_TYPE"]: 565 """ 566 Returns 567 ------- 568 Iterable["NODE_TYPE"] 569 The recurrent states set. 570 """ 571 if self._recurrent_nodes_set is None: 572 self._recurrent_nodes_set = get_recurrent_nodes_set( 573 self.communication_class, self.G 574 ) 575 return self._recurrent_nodes_set 576 577 @property 578 def communication_class( 579 self: Union["EpisodicMDP", "ContinuousMDP"] 580 ) -> MDPCommunicationClass: 581 """ 582 Returns 583 ------- 584 MDPCommunicationClass 585 The communication class. 586 """ 587 if self._communication_class is None: 588 self._communication_class = get_communication_class( 589 self.T, self.get_episodic_graph(True) if self.is_episodic() else self.G 590 ) 591 return self._communication_class 592 593 def get_optimal_policy(self, stochastic_form: bool) -> np.ndarray: 594 """ 595 Returns 596 ------- 597 np.ndarray 598 The optimal policy. It can be either in the stochastic form, so in the form of dirac probability 599 distribution or as a simple mapping to integer. 600 """ 601 if stochastic_form not in self._optimal_policy: 602 self._optimal_policy[stochastic_form] = get_policy_from_q_values( 603 self.optimal_value_functions[0], stochastic_form 604 ) 605 return self._optimal_policy[stochastic_form] 606 607 def get_worst_policy(self, stochastic_form) -> np.ndarray: 608 """ 609 Returns 610 ------- 611 np.ndarray 612 The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability 613 distribution or as a simple mapping to integer. 614 """ 615 if stochastic_form not in self._worst_policy: 616 self._worst_policy[stochastic_form] = get_policy_from_q_values( 617 self._vi(self.T, -self.R)[0], stochastic_form 618 ) 619 return self._worst_policy[stochastic_form] 620 621 def get_value_functions(self, policy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: 622 """ 623 Parameters 624 ---------- 625 policy : np.ndarray 626 a numpy array containing a continuous policy. 627 Returns 628 ------- 629 Q : np.ndarray 630 the state action value function. 631 V : np.ndarray 632 the state value function. 633 """ 634 return self._pe(*self.transition_matrix_and_rewards, policy) 635 636 @property 637 def optimal_value_functions(self) -> Tuple[np.ndarray, np.ndarray]: 638 """ 639 Returns 640 ------- 641 Q : np.ndarray 642 the optimal state action value function. 643 V : np.ndarray 644 the optimal state value function. 645 """ 646 if self._optimal_value is None: 647 self._optimal_value = self._vi(*self.transition_matrix_and_rewards) 648 return self._optimal_value 649 650 @property 651 def worst_value_functions(self) -> Tuple[np.ndarray, np.ndarray]: 652 """ 653 Returns 654 ------- 655 Q : np.ndarray 656 the state action value function of the worst policy. 657 V : np.ndarray 658 the state value function of the worst policy. 659 """ 660 if self._worst_value is None: 661 self._worst_value = self._pe( 662 *self.transition_matrix_and_rewards, self.get_worst_policy(True) 663 ) 664 return self._worst_value 665 666 @property 667 def random_value_functions(self) -> Tuple[np.ndarray, np.ndarray]: 668 """ 669 Returns 670 ------- 671 Q : np.ndarray 672 the state action value function of the random uniform policy. 673 V : np.ndarray 674 the state value function of the random uniform policy. 675 """ 676 if self._random_value is None: 677 self._random_value = self._pe( 678 *self.transition_matrix_and_rewards, self.random_policy 679 ) 680 return self._random_value 681 682 @property 683 def optimal_transition_probabilities(self) -> np.ndarray: 684 """ 685 Returns 686 ------- 687 np.ndarray 688 The transition probabilities corresponding to the optimal policy. 689 """ 690 if self._otp is None: 691 T = self.T_cf if self.is_episodic() else self.T 692 pi = ( 693 self.get_optimal_policy_continuous_form(True) 694 if self.is_episodic() 695 else self.get_optimal_policy(True) 696 ) 697 self._otp = get_transition_probabilities(T, pi) 698 return self._otp 699 700 @property 701 def worst_transition_probabilities(self) -> np.ndarray: 702 """ 703 Returns 704 ------- 705 np.ndarray 706 The transition probabilities corresponding to the worst policy. 707 """ 708 if self._wtp is None: 709 T = self.T_cf if self.is_episodic() else self.T 710 pi = ( 711 self.get_worst_policy_continuous_form(True) 712 if self.is_episodic() 713 else self.get_worst_policy(True) 714 ) 715 self._wtp = get_transition_probabilities(T, pi) 716 return self._wtp 717 718 @property 719 def random_transition_probabilities(self) -> np.ndarray: 720 """ 721 Returns 722 ------- 723 np.ndarray 724 The transition probabilities corresponding to the random uniform policy. 725 """ 726 if self._rtp is None: 727 T = self.T_cf if self.is_episodic() else self.T 728 pi = self.random_policy_cf if self.is_episodic() else self.random_policy 729 self._rtp = get_transition_probabilities(T, pi) 730 return self._rtp 731 732 @property 733 def optimal_markov_chain(self) -> MarkovChain: 734 """ 735 Returns 736 ------- 737 MarkovChain 738 The Markov chain corresponding to the optimal policy. 739 """ 740 if self._omc is None: 741 self._omc = get_markov_chain(self.optimal_transition_probabilities) 742 return self._omc 743 744 @property 745 def worst_markov_chain(self) -> MarkovChain: 746 """ 747 Returns 748 ------- 749 MarkovChain 750 The Markov chain corresponding to the worst policy. 751 """ 752 if self._wmc is None: 753 self._wmc = get_markov_chain(self.worst_transition_probabilities) 754 return self._wmc 755 756 @property 757 def random_markov_chain(self) -> MarkovChain: 758 """ 759 Returns 760 ------- 761 MarkovChain 762 The Markov chain corresponding to the random uniform policy. 763 """ 764 if self._rmc is None: 765 self._rmc = get_markov_chain(self.random_transition_probabilities) 766 return self._rmc 767 768 def get_stationary_distribution(self, policy: np.ndarray) -> np.ndarray: 769 """ 770 Parameters 771 ---------- 772 policy : np.ndarray 773 a numpy array containing a continuous policy. 774 Returns 775 ------- 776 np.ndarray 777 the stationary distribution of the Markov chain yielded by the policy. 778 """ 779 return get_stationary_distribution( 780 get_transition_probabilities(self.T, policy), 781 self.starting_states_and_probs, 782 ) 783 784 @property 785 def optimal_stationary_distribution(self) -> np.array: 786 """ 787 Returns 788 ------- 789 np.ndarray 790 the stationary distribution of the Markov chain yielded by the optimal policy. 791 """ 792 if self._osd is None: 793 self._osd = get_stationary_distribution( 794 self.optimal_transition_probabilities, 795 self.starting_states_and_probs, 796 ) 797 return self._osd 798 799 @property 800 def worst_stationary_distribution(self) -> np.array: 801 """ 802 Returns 803 ------- 804 np.ndarray 805 the stationary distribution of the Markov chain yielded by the worst policy. 806 """ 807 if self._wsd is None: 808 self._wsd = get_stationary_distribution( 809 self.worst_transition_probabilities, 810 self.starting_states_and_probs, 811 ) 812 assert not np.isnan(self._wsd).any() 813 return self._wsd 814 815 @property 816 def random_stationary_distribution(self) -> np.array: 817 """ 818 Returns 819 ------- 820 np.ndarray 821 the stationary distribution of the Markov chain yielded by the random uniform policy. 822 """ 823 if self._rsd is None: 824 self._rsd = get_stationary_distribution( 825 self.random_transition_probabilities, 826 None, 827 ) 828 return self._rsd 829 830 @property 831 def optimal_average_rewards(self) -> np.ndarray: 832 """ 833 Returns 834 ------- 835 np.ndarray 836 The average rewards obtained for each state when following the optimal policy. 837 """ 838 if self._oars is None: 839 R = self.R_cf if self.is_episodic() else self.R 840 pi = ( 841 self.get_optimal_policy_continuous_form(True) 842 if self.is_episodic() 843 else self.get_optimal_policy(True) 844 ) 845 self._oars = get_average_rewards(R, pi) 846 return self._oars 847 848 @property 849 def worst_average_rewards(self) -> np.ndarray: 850 """ 851 Returns 852 ------- 853 np.ndarray 854 The average rewards obtained for each state when following the worst policy. 855 """ 856 if self._wars is None: 857 R = self.R_cf if self.is_episodic() else self.R 858 pi = ( 859 self.get_worst_policy_continuous_form(True) 860 if self.is_episodic() 861 else self.get_worst_policy(True) 862 ) 863 self._wars = get_average_rewards(R, pi) 864 return self._wars 865 866 @property 867 def random_average_rewards(self) -> np.ndarray: 868 """ 869 Returns 870 ------- 871 np.ndarray 872 The average rewards obtained for each state when following the random uniform policy. 873 """ 874 if self._rars is None: 875 R = self.R_cf if self.is_episodic() else self.R 876 pi = self.random_policy_cf if self.is_episodic() else self.random_policy 877 self._rars = get_average_rewards(R, pi) 878 return self._rars 879 880 def get_average_reward(self, policy: np.ndarray) -> float: 881 r""" 882 Parameters 883 ---------- 884 policy : np.ndarray 885 a numpy array containing a continuous policy. 886 Returns 887 ------- 888 average_reward : float 889 the expected time average reward, i.e. 890 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 891 the policy. 892 """ 893 sd = self.get_stationary_distribution(policy) 894 return sum(sd * get_average_rewards(self.R, policy)) 895 896 @property 897 def optimal_average_reward(self) -> float: 898 r""" 899 Returns 900 ------- 901 average_reward : float 902 the expected time average reward, i.e. 903 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 904 the optimal policy. 905 """ 906 if self._oar is None: 907 self._oar = sum( 908 self.optimal_stationary_distribution * self.optimal_average_rewards 909 ) 910 return self._oar 911 912 @property 913 def worst_average_reward(self) -> float: 914 r""" 915 Returns 916 ------- 917 average_reward : float 918 the expected time average reward, i.e. 919 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 920 the worst policy. 921 """ 922 if self._war is None: 923 self._war = sum( 924 self.worst_stationary_distribution * self.worst_average_rewards 925 ) 926 return self._war 927 928 @property 929 def random_average_reward(self) -> float: 930 r""" 931 Returns 932 ------- 933 average_reward : float 934 the expected time average reward, i.e. 935 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 936 the random uniform policy. 937 """ 938 if self._rar is None: 939 self._rar = sum( 940 self.random_stationary_distribution * self.random_average_rewards 941 ) 942 return self._rar 943 944 @property 945 def transition_matrix_and_rewards(self) -> Tuple[np.ndarray, np.ndarray]: 946 """ 947 Returns 948 ------- 949 np.ndarray 950 The transition probabilities 3d numpy array. 951 np.ndarray 952 The reward matrix. 953 """ 954 if self._transition_matrix_and_rewards is None: 955 self._transition_matrix_and_rewards = get_transition_matrix_and_rewards( 956 self.n_states, 957 self.n_actions, 958 self.G, 959 self.get_info_class, 960 self.get_reward_distribution, 961 self.node_to_index, 962 self._force_sparse_transition, 963 ) 964 return self._transition_matrix_and_rewards 965 966 @property 967 def graph_layout(self) -> Dict["NODE_TYPE", Tuple[float, float]]: 968 """ 969 Returns 970 ------- 971 Dict["NODE_TYPE", Tuple[float, float]] 972 The graph layout for the MDP. It can be customized by implementing the `custom_graph_layout` function. 973 """ 974 if self._graph_layout is None: 975 self._graph_layout = ( 976 self.custom_graph_layout() 977 if hasattr(self, "custom_graph_layout") 978 else nx.nx_agraph.graphviz_layout(self.G) 979 ) 980 return self._graph_layout 981 982 @property 983 def graph_metrics(self) -> Dict[str, Any]: 984 """ 985 Returns 986 ------- 987 Dict[str, Any] 988 The dictionary containing graph metrics for the graph underlying the MDP. 989 """ 990 if self._graph_metrics is None: 991 G = self.get_episodic_graph(True) if self.is_episodic() else self.G 992 self._graph_metrics = dict() 993 self._graph_metrics["# nodes"] = len(G.nodes) 994 self._graph_metrics["# edges"] = len(G.edges) 995 return self._graph_metrics 996 997 @property 998 def diameter(self: Union["EpisodicMDP", "ContinuousMDP"]) -> float: 999 """ 1000 Returns 1001 ------- 1002 float 1003 The diameter of the MDP. 1004 """ 1005 if self._diameter is None: 1006 if self.hardness_report: 1007 self._diameter = self.hardness_report["MDP measure of hardness"][ 1008 "diameter" 1009 ] 1010 else: 1011 self._diameter = get_diameter( 1012 self.episodic_transition_matrix_and_rewards[0] 1013 if self.is_episodic() 1014 else self.T, 1015 self.is_episodic(), 1016 ) 1017 return self._diameter 1018 1019 @property 1020 def sum_reciprocals_suboptimality_gaps( 1021 self: Union["EpisodicMDP", "ContinuousMDP"] 1022 ) -> float: 1023 """ 1024 Returns 1025 ------- 1026 float 1027 The sum of the reciprocals of the sub-optimality gaps 1028 """ 1029 if self._sum_reciprocals_suboptimality_gaps is None: 1030 if self.hardness_report: 1031 self._sum_reciprocals_suboptimality_gaps = self.hardness_report[ 1032 "MDP measure of hardness" 1033 ]["suboptimal_gaps"] 1034 else: 1035 self._sum_reciprocals_suboptimality_gaps = ( 1036 get_sum_reciprocals_suboptimality_gaps( 1037 *self.optimal_value_functions, 1038 self.reachable_states if self.is_episodic() else None, 1039 ) 1040 ) 1041 return self._sum_reciprocals_suboptimality_gaps 1042 1043 def _compute_value_norm(self, discounted: bool) -> float: 1044 """ 1045 Returns 1046 ------- 1047 float 1048 The environmental value norm in the undiscounted or undiscounted setting. 1049 """ 1050 T, R = (self.T_cf, self.R_cf) if self.is_episodic() else (self.T, self.R) 1051 V = ( 1052 self.optimal_value_continuous_form[1] 1053 if self.is_episodic() 1054 else self.optimal_value_functions[1] 1055 ) 1056 if discounted: 1057 return calculate_norm_discounted(T, V) 1058 return calculate_norm_average( 1059 T, self.optimal_transition_probabilities, self.optimal_average_rewards 1060 ) 1061 1062 @property 1063 def discounted_value_norm(self) -> float: 1064 """ 1065 Returns 1066 ------- 1067 float 1068 The environmental value norm in the discounted setting. 1069 """ 1070 if True not in self._optimal_value_norm: 1071 if ( 1072 self._are_all_transition_deterministic 1073 and self._are_all_rewards_deterministic 1074 ): 1075 self._optimal_value_norm[True] = 0.0 1076 elif self.hardness_report: 1077 self._optimal_value_norm[True] = self.hardness_report[ 1078 "MDP measure of hardness" 1079 ]["value_norm"] 1080 else: 1081 self._optimal_value_norm[True] = self._compute_value_norm(True) 1082 return self._optimal_value_norm[True] 1083 1084 @property 1085 def undiscounted_value_norm(self) -> float: 1086 """ 1087 Returns 1088 ------- 1089 float 1090 The environmental value norm in the undiscounted setting. 1091 """ 1092 if False not in self._optimal_value_norm: 1093 self._optimal_value_norm[False] = self._compute_value_norm(False) 1094 return self._optimal_value_norm[False] 1095 1096 @property 1097 def value_norm(self): 1098 """ 1099 is an alias for the discounted value norm. 1100 """ 1101 return self.discounted_value_norm 1102 1103 @property 1104 def measures_of_hardness(self) -> Dict[str, float]: 1105 """ 1106 Returns 1107 ------- 1108 Dict[str, float] 1109 The dictionary containing all the measures of hardness availble. 1110 """ 1111 return dict( 1112 diameter=self.diameter, 1113 suboptimal_gaps=self.sum_reciprocals_suboptimality_gaps, 1114 value_norm=self.value_norm, 1115 ) 1116 1117 @property 1118 def summary(self) -> Dict[str, Dict[str, Any]]: 1119 """ 1120 Returns 1121 ------- 1122 Dict[str, Dict[str, Any]] 1123 The dictionary with information about the parameters, the measures of hardness and the graph metrics. 1124 """ 1125 if self._summary is None: 1126 self._summary = { 1127 "Parameters": clean_for_storing(self.parameters), 1128 "Measure of hardness": clean_for_storing(self.measures_of_hardness), 1129 "Graph metrics": clean_for_storing(self.graph_metrics), 1130 } 1131 return self._summary 1132 1133 @property 1134 def hardness_report(self) -> Union[Dict, None]: 1135 """ 1136 looks for a cached hardness report in the folder given in input when the MDP was created. It returns None if it 1137 is not able to find it. 1138 """ 1139 if self._hr is None: 1140 report_file = find_hardness_report_file(self, self._hardness_reports_folder) 1141 if report_file: 1142 with open(report_file, "r") as f: 1143 report = yaml.load(f, yaml.Loader) 1144 self._hr = report 1145 else: 1146 self._hr = False 1147 if self._hr: 1148 return self._hr 1149 return None 1150 1151 def get_info_class(self, n: "NODE_TYPE") -> NodeInfoClass: 1152 """ 1153 Returns 1154 ------- 1155 NodeInfoClass 1156 The container class (NodeInfoClass) associated with state n. 1157 """ 1158 return self.G.nodes[n]["info_class"] 1159 1160 def get_transition_distributions( 1161 self, node: "NODE_TYPE" 1162 ) -> Dict[int, Union[rv_continuous, NextStateSampler]]: 1163 """ 1164 Returns 1165 ------- 1166 Dict[int, Union[rv_continuous, NextStateSampler]] 1167 The dictionary containing the transition distributions for any action at the state given in input. 1168 """ 1169 return self.get_info_class(node).transition_distributions 1170 1171 def get_reward_distribution( 1172 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 1173 ) -> rv_continuous: 1174 """ 1175 Returns 1176 ------- 1177 rv_continuous 1178 The reward distribution for transitioning in next_node when selecting action from state. 1179 """ 1180 if (node, action, next_node) not in self._cached_reward_distributions: 1181 self._cached_reward_distributions[ 1182 node, action, next_node 1183 ] = self._get_reward_distribution( 1184 node, self._inverse_action_mapping(node, action), next_node 1185 ) 1186 return self._cached_reward_distributions[node, action, next_node] 1187 1188 def sample_reward( 1189 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 1190 ) -> float: 1191 """ 1192 Returns 1193 ------- 1194 float 1195 A sample from the reward distribution when transitioning in next_node when selecting action from state. 1196 """ 1197 if (node, action, next_node) not in self._cached_rewards or len( 1198 self._cached_rewards[node, action, next_node] 1199 ) == 0: 1200 self._cached_rewards[node, action, next_node] = ( 1201 self.get_reward_distribution(node, action, next_node) 1202 .rvs(5000, random_state=self._rng) 1203 .tolist() 1204 ) 1205 r = self._cached_rewards[node, action, next_node].pop(0) 1206 return ( 1207 r * (self.rewards_range[1] - self.rewards_range[0]) - self.rewards_range[0] 1208 ) 1209 1210 def get_measure_from_name(self, measure_name: str) -> float: 1211 """ 1212 Parameters 1213 ---------- 1214 measure_name : str 1215 the code name for the measure. The available code names can be obtained using the 1216 ``BaseMDP.get_available_hardness_measures`` function. 1217 Returns 1218 ------- 1219 float 1220 The value for the hardness measure. 1221 """ 1222 if measure_name == "diameter": 1223 return self.diameter 1224 elif measure_name in ["value_norm", "environmental_value_norm"]: 1225 return self.value_norm 1226 elif measure_name == "suboptimal_gaps": 1227 return self.sum_reciprocals_suboptimality_gaps 1228 else: 1229 raise ValueError( 1230 f"{measure_name} is not a valid hardness measure name: available ones are " 1231 + str(self.get_available_hardness_measures()) 1232 ) 1233 1234 def action_spec(self) -> DiscreteArray: 1235 """ 1236 Returns 1237 ------- 1238 DiscreteArray 1239 The action spec of the environment. 1240 """ 1241 return DiscreteArray(self.n_actions, name="action") 1242 1243 def observation_spec(self) -> Array: 1244 """ 1245 Returns 1246 ------- 1247 Array 1248 The observation spec of the environment. 1249 """ 1250 if self.emission_map is None: 1251 return DiscreteArray(self.n_states, name="observation") 1252 obs = self.get_observation(self.starting_nodes[0], 0) 1253 return BoundedArray(obs.shape, obs.dtype, -np.inf, np.inf, "observation") 1254 1255 def get_observation( 1256 self, node: "NODE_TYPE", h: int = None 1257 ) -> Union[int, np.ndarray]: 1258 """ 1259 Returns 1260 ------- 1261 Union[int, np.ndarray] 1262 The representation corresponding to the state given in input. Episodic MDPs also requires the current 1263 in-episode time step. 1264 """ 1265 if self.emission_map is None: 1266 return self.node_to_index[self.cur_node] 1267 return self.emission_map.get_observation(node, h) 1268 1269 def reset(self) -> dm_env.TimeStep: 1270 """ 1271 resets the environment to a newly sampled starting state. 1272 """ 1273 self.necessary_reset = False 1274 self.h = 0 1275 self.cur_node = self.last_starting_node = self._starting_node_sampler.sample() 1276 node_info_class = self.get_info_class(self.cur_node) 1277 node_info_class.update_visitation_counts() 1278 return dm_env.restart(self.get_observation(self.cur_node, self.h)) 1279 1280 def step(self, action: int, auto_reset=False) -> dm_env.TimeStep: 1281 """ 1282 takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the 1283 at the end of the episodes. 1284 Returns 1285 ------- 1286 dm_env.TimeStep 1287 The TimeStep object containing the transition information. 1288 """ 1289 if auto_reset and self.necessary_reset: 1290 return self.reset() 1291 assert not self.necessary_reset 1292 1293 # This can be the in episode time step (episodic setting) or the total numuber of time steps (continuous setting) 1294 self.h += 1 1295 1296 # In case the action is a numpy array 1297 action = int(action) 1298 1299 # Moving the current state according to the action played 1300 old_node = self.cur_node 1301 self.cur_node = self.get_info_class(old_node).sample_next_state(action) 1302 self.last_edge = old_node, self.cur_node 1303 node_info_class = self.get_info_class(self.cur_node) 1304 node_info_class.update_visitation_counts(action) 1305 1306 # Calculate reward and observation 1307 reward = self.sample_reward(old_node, action, self.cur_node) 1308 observation = self.get_observation(self.cur_node, self.h) 1309 1310 # Wrapping the time step in a dm_env.TimeStep 1311 if self.is_episodic() and self.h >= self.H: 1312 self.necessary_reset = True 1313 if self.emission_map is None: 1314 observation = -1 1315 else: 1316 observation = np.zeros_like(self.observation_spec().generate_value()) 1317 return dm_env.termination(reward=reward, observation=observation) 1318 return dm_env.transition(reward=reward, observation=observation) 1319 1320 def random_steps( 1321 self, n: int, auto_reset=False 1322 ) -> List[Tuple[dm_env.TimeStep, int]]: 1323 """ 1324 takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to 1325 True than it automatically resets episodic MDPs. 1326 1327 Returns 1328 ------- 1329 dm_env.TimeStep 1330 The TimeStep object containing the transition information. 1331 int 1332 The action performed. 1333 """ 1334 1335 data = [] 1336 for _ in range(n): 1337 action = int(self._rng.randint(self.action_spec().num_values)) 1338 ts = self.step(action, auto_reset) 1339 data.append((ts, action)) 1340 return data 1341 1342 def random_step(self, auto_reset=False) -> Tuple[dm_env.TimeStep, int]: 1343 """ 1344 takes a step with a random action and returns both the next step and the random action. If auto_reset is set to 1345 True than it automatically resets episodic MDPs. 1346 1347 Returns 1348 ------- 1349 dm_env.TimeStep 1350 The TimeStep object containing the transition information. 1351 int 1352 The action performed. 1353 """ 1354 action = int(self._rng.randint(self.action_spec().num_values)) 1355 ts = self.step(action, auto_reset) 1356 return ts, action 1357 1358 def get_visitation_counts( 1359 self, state_only=True 1360 ) -> Dict[Union["NODE_TYPE", Tuple["NODE_TYPE", "ACTION_TYPE"]], int]: 1361 """ 1362 when state_only is True it returns the visitation counts for the states and when it is False it returns the 1363 visitation counts for state action pairs. 1364 """ 1365 if state_only: 1366 return { 1367 node: self.get_info_class(node).state_visitation_count 1368 for node in self.G.nodes 1369 } 1370 return { 1371 (node, a): self.get_info_class(node).actions_visitation_count[a] 1372 for node in self.G.nodes 1373 for a in range(self.n_actions) 1374 } 1375 1376 def reset_visitation_counts(self): 1377 """ 1378 resets the visitation counts to zero for all states and state action pairs. 1379 """ 1380 for node in self.G.nodes: 1381 self.get_info_class(node).state_visitation_count = 0 1382 for a in range(self.n_actions): 1383 self.get_info_class(node).actions_visitation_count[a] = 0 1384 1385 def get_value_node_labels( 1386 self: Union["ContinuousMDP", "EpisodicMDP"], V: np.ndarray = None 1387 ) -> Dict["NODE_TYPE", float]: 1388 """ 1389 returns a mapping from state to state values. By default, it uses the optimal values. 1390 """ 1391 if V is None: 1392 _, V = self.optimal_value_functions 1393 else: 1394 if isinstance(self, EpisodicMDP): 1395 h, d = V.shape 1396 assert h == self.H and d == self.n_states 1397 else: 1398 assert len(V) == self.n_states 1399 return { 1400 node: np.round( 1401 V[0, self.node_to_index[node]] 1402 if self.is_episodic() 1403 else V[self.node_to_index[node]], 1404 2, 1405 ) 1406 for node in self.G.nodes 1407 }
The base class for MDPs.
327 @abc.abstractmethod 328 def __init__( 329 self: Union["EpisodicMDP", "ContinuousMDP"], 330 seed: int, 331 randomize_actions: bool = True, 332 make_reward_stochastic=False, 333 reward_variance_multiplier: float = 1.0, 334 p_lazy: float = None, 335 p_rand: float = None, 336 rewards_range: Tuple[float, float] = (0.0, 1.0), 337 emission_map: Type[EmissionMap] = Tabular, 338 emission_map_kwargs: Dict[str, Any] = dict(), 339 noise: Type[EmissionMap] = None, 340 noise_kwargs: Dict[str, Any] = dict(), 341 instantiate_mdp: bool = True, 342 force_sparse_transition: bool = False, 343 exclude_horizon_from_parameters: bool = False, 344 ): 345 """ 346 Parameters 347 ---------- 348 seed : int 349 The random seed. 350 randomize_actions : bool 351 If True, the outcome of action at different states is randomized. By default, it is set to True. 352 make_reward_stochastic : bool 353 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 354 reward_variance_multiplier : float 355 A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the 356 variance. By default, it is set to 1. 357 p_lazy : float 358 The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero. 359 p_rand : float 360 The probability of selecting an action at random instead of the one specified by the agent. By default, it 361 is set to zero. 362 rewards_range : Tuple[float, float] 363 The maximum range of values for the rewards. By default, it is `(0, 1)`. 364 emission_map : Type[EmissionMap] 365 The emission map that renders the MPD non-tabular. By default, no emission map is used. 366 emission_map_kwargs : Dict[str, Any] 367 The keyword arguments for the emission map. 368 noise : Type[EmissionMap] 369 The noise class to make the emission map stochastic. By default, the emission map is left deterministic. 370 noise_kwargs : Dict[str, Any] 371 The keyword arguments for the noise. 372 instantiate_mdp : bool 373 If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True. 374 force_sparse_transition : bool 375 If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not 376 enforced. 377 exclude_horizon_from_parameters : bool 378 If True, in the episodic case, the horizon is not considered a parameter when computing the `parameters` 379 properties and the hash. By default, it is not excluded. 380 """ 381 382 # MDP generic variables 383 self._seed = seed 384 self._randomize_actions = randomize_actions 385 self._make_reward_stochastic = make_reward_stochastic 386 self._reward_variance_multiplier = reward_variance_multiplier 387 self._hardness_reports_folder = config.get_hardness_measures_cache_folder() 388 self._force_sparse_transition = force_sparse_transition 389 self._exclude_horizon_from_parameters = exclude_horizon_from_parameters 390 self._p_rand = p_rand if p_rand is None or p_rand > 0.0 else None 391 self._p_lazy = p_lazy if p_lazy is None or p_lazy > 0.0 else None 392 self.rewards_range = self._rewards_range = ( 393 rewards_range 394 if rewards_range[0] < rewards_range[1] 395 else rewards_range[::-1] 396 ) 397 self._are_all_rewards_deterministic = True 398 self._are_all_transition_deterministic = True 399 self.r_min, self.r_max = self.rewards_range 400 401 # MDP loop variables 402 self._hr = None 403 self.cur_node = None 404 self.last_edge = None 405 self.last_starting_node = None 406 self.is_reset_necessary = True 407 self.current_timestep = 0 408 self.h = 0 409 self._rng = np.random.RandomState(seed) 410 self._fast_rng = random.Random(seed) 411 412 # Caching variables. Note that we cannot use lru_cache because it prevents from deleting the objects. 413 self._cached_rewards = dict() 414 self._cached_reward_distributions = dict() 415 self._action_mapping = dict() 416 self._communication_class = None 417 self._recurrent_nodes_set = None 418 if not hasattr(self, "_transition_matrix_and_rewards"): 419 self._transition_matrix_and_rewards = None 420 self._graph_layout = None 421 self._graph_metrics = None 422 self._summary = None 423 self._diameter = None 424 self._sum_reciprocals_suboptimality_gaps = None 425 self._optimal_value_norm = dict() 426 self._optimal_value = None 427 self._worst_value = None 428 self._random_value = None 429 self._optimal_policy = dict() 430 self._worst_policy = dict() 431 self._otp = None 432 self._omc = None 433 self._osd = None 434 self._oars = None 435 self._oar = None 436 self._wtp = None 437 self._wmc = None 438 self._wsd = None 439 self._wars = None 440 self._war = None 441 self._rtp = None 442 self._rmc = None 443 self._rsd = None 444 self._rars = None 445 self._rar = None 446 447 if instantiate_mdp: 448 self.instantiate_MDP() 449 450 # Emission map 451 if emission_map == Tabular or emission_map is None: 452 self.emission_map = None 453 self.is_tabular = True 454 else: 455 noise_kwargs["seed"] = seed 456 self.emission_map = emission_map( 457 self, 458 noise_class=noise, 459 noise_kwargs=noise_kwargs, 460 **emission_map_kwargs, 461 ) 462 self.is_tabular = self.emission_map.is_tabular
Parameters
- seed (int): The random seed.
- randomize_actions (bool): If True, the outcome of action at different states is randomized. By default, it is set to True.
- make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
- reward_variance_multiplier (float): A multiplier for the variance of the rewards that keeps the mean fixed. The lower the value, the higher the variance. By default, it is set to 1.
- p_lazy (float): The probability that an MDP stays in the same state instead of executing the action selected by an agent. By default, it is set to zero.
- p_rand (float): The probability of selecting an action at random instead of the one specified by the agent. By default, it is set to zero.
- rewards_range (Tuple[float, float]):
The maximum range of values for the rewards. By default, it is
(0, 1)
. - emission_map (Type[EmissionMap]): The emission map that renders the MPD non-tabular. By default, no emission map is used.
- emission_map_kwargs (Dict[str, Any]): The keyword arguments for the emission map.
- noise (Type[EmissionMap]): The noise class to make the emission map stochastic. By default, the emission map is left deterministic.
- noise_kwargs (Dict[str, Any]): The keyword arguments for the noise.
- instantiate_mdp (bool): If True, the underlying graph for the MDP is immediately instantiated. By default, it is set to True.
- force_sparse_transition (bool): If True, the transition matrix is forced to be stored in a sparse matrix formate. By default, it is not enforced.
- exclude_horizon_from_parameters (bool):
If True, in the episodic case, the horizon is not considered a parameter when computing the
parameters
properties and the hash. By default, it is not excluded.
51 @staticmethod 52 @abc.abstractmethod 53 def get_unique_symbols() -> List[str]: 54 """ 55 Returns 56 ------- 57 List[str] 58 the unique symbols of the grid representation of the MDP. 59 """
Returns
- List[str]: the unique symbols of the grid representation of the MDP.
61 @staticmethod 62 def get_available_hardness_measures() -> List[str]: 63 """ 64 Returns 65 ------- 66 List[str] 67 The hardness measures available in the package. 68 """ 69 return ["diameter", "value_norm", "suboptimal_gaps"]
Returns
- List[str]: The hardness measures available in the package.
71 @staticmethod 72 def produce_gin_file_from_mdp_parameters( 73 parameters: Dict[str, Any], mdp_class_name: str, index: int = 0 74 ) -> str: 75 """ 76 Parameters 77 ---------- 78 parameters : Dict[str, Any] 79 The parameters of the MDP. 80 mdp_class_name : str 81 The name of the class of the MDP. 82 index : int 83 The index for the gin configuration 84 85 Returns 86 ------- 87 str 88 The gin configuration for the given parameters and index. 89 """ 90 91 string = "" 92 for k, v in parameters.items(): 93 string += f"prms_{index}/{mdp_class_name}.{k} = {v}\n" 94 return string
Parameters
- parameters (Dict[str, Any]): The parameters of the MDP.
- mdp_class_name (str): The name of the class of the MDP.
- index (int): The index for the gin configuration
Returns
- str: The gin configuration for the given parameters and index.
96 @staticmethod 97 @abc.abstractmethod 98 def does_seed_change_MDP_structure() -> bool: 99 """ 100 Returns 101 ------- 102 bool 103 True if when changing the seed the transition matrix and/or rewards matrix change. This for example may 104 happen when there are fewer starting states that possible one and the effective starting states are picked 105 randomly based on the seed. 106 """
Returns
- bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
- happen when there are fewer starting states that possible one and the effective starting states are picked
- randomly based on the seed.
108 @staticmethod 109 @abc.abstractmethod 110 def is_episodic() -> bool: 111 """ 112 Returns 113 ------- 114 bool 115 True if the MDP is episodic. 116 """
Returns
- bool: True if the MDP is episodic.
118 @staticmethod 119 @abc.abstractmethod 120 def sample_parameters(n: int, seed: int = None) -> List[Dict[str, Any]]: 121 """ 122 Returns 123 ------- 124 List[Dict[str, Any]] 125 n sampled parameters that can be used to construct an MDP in a reasonable amount of time. 126 """
Returns
- List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
128 @staticmethod 129 @abc.abstractmethod 130 def sample_mdp_parameters( 131 n: int, is_episodic: bool, seed: int = None 132 ) -> List[Dict[str, Any]]: 133 """ 134 Returns 135 ------- 136 List[Dict[str, Any]] 137 n sampled parameters that can be used to construct an MDP in a reasonable amount of time. 138 """
Returns
- List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
140 @staticmethod 141 @abc.abstractmethod 142 def get_node_class() -> Type["NODE_TYPE"]: 143 """ 144 Returns 145 ------- 146 Type["NODE_TYPE"] 147 The class of the nodes of the MDP. 148 """
Returns
- Type["NODE_TYPE"]: The class of the nodes of the MDP.
200 @abc.abstractmethod 201 def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.ndarray: 202 """ 203 Returns 204 ------- 205 np.ndarray 206 An ASCII representation of the state given in input stored as numpy array. 207 """
Returns
- np.ndarray: An ASCII representation of the state given in input stored as numpy array.
247 @abc.abstractmethod 248 def get_gin_parameters(self, index: int) -> str: 249 """ 250 Returns 251 ------- 252 str 253 The gin config of the MDP instance. 254 """
Returns
- str: The gin config of the MDP instance.
256 def get_gin_config(self, index: int) -> str: 257 """ 258 Returns 259 ------- 260 str 261 The gin config file of the MDP. 262 """ 263 return "".join(self.get_gin_parameters(index))
Returns
- str: The gin config file of the MDP.
265 def get_node_labels(self, l: List[Any]) -> Dict["NODE_TYPE", Any]: 266 """ 267 Returns 268 ------- 269 Dict["NODE_TYPE", Any] 270 A dictionary that assigns to each node the value of the list in the corresponding index. 271 """ 272 assert len(l) == self.n_states, ( 273 f"The array in input should have the same dimensionality of the number of states {self.n_states}, " 274 f" {len(l)} long array received instead." 275 ) 276 return {self.index_to_node[i]: l[i] for i in range(len(l))}
Returns
- Dict["NODE_TYPE", Any]: A dictionary that assigns to each node the value of the list in the corresponding index.
278 def get_node_action_labels(self, l: List[List[Any]]) -> Dict["NODE_TYPE", float]: 279 """ 280 Returns 281 ------- 282 Dict["NODE_TYPE", Any] 283 A dictionary that assigns to each node and action the value of the list in the corresponding index. 284 """ 285 assert len(l) == self.n_states, ( 286 f"The list in input should be the same length of the number of states {self.n_states}, " 287 f" list of length {len(l)} received instead." 288 ) 289 assert all( 290 len(ll) == self.n_actions for ll in l 291 ), f"The lists inside the lists should be the same length as the number of actions {self.n_actions}, " 292 return { 293 (self.index_to_node[i], action): l[i][action] 294 for i in range(len(l)) 295 for action in range(self.n_actions) 296 }
Returns
- Dict["NODE_TYPE", Any]: A dictionary that assigns to each node and action the value of the list in the corresponding index.
Returns
- str: The hash value based on the parameters of the MDP. This can be used to create cache files.
464 def instantiate_MDP(self): 465 """ 466 recurisively instantiate the MDP. 467 """ 468 # Instantiating the MDP 469 self._check_parameters_in_input() 470 self._starting_node_sampler = self._get_starting_node_sampler() 471 self.starting_nodes = self._starting_node_sampler.next_nodes 472 self.G = nx.DiGraph() 473 self._instantiate_mdp() 474 self.n_states = len(self.G.nodes) 475 476 # Some shortcuts variables 477 if not self.is_episodic(): 478 self._vi, self._pe = ( 479 discounted_value_iteration, 480 discounted_policy_evaluation, 481 ) 482 self.random_policy = ( 483 np.ones((self.n_states, self.n_actions), dtype=np.float32) 484 / self.n_actions 485 ) 486 487 # Cache state to index mapping 488 mapping = self._rng.rand(self.n_states, self.n_actions).argsort(1) 489 self.node_to_index = dict() 490 self.index_to_node = dict() 491 for i, node in enumerate(self.G.nodes): 492 self.node_to_index[node] = i 493 self.index_to_node[i] = node 494 495 # Compute the starting state distribution 496 self.starting_state_distribution = np.zeros(self.n_states) 497 self.starting_states = [] 498 for n, p in self._starting_node_sampler.next_nodes_and_probs: 499 s = self.node_to_index[n] 500 self.starting_state_distribution[s] = p 501 self.starting_states.append(s) 502 self.starting_states_and_probs = list( 503 zip(self.starting_states, self._starting_node_sampler.probs) 504 )
recurisively instantiate the MDP.
Returns
- Iterable["NODE_TYPE"]: The recurrent states set.
Returns
- MDPCommunicationClass: The communication class.
593 def get_optimal_policy(self, stochastic_form: bool) -> np.ndarray: 594 """ 595 Returns 596 ------- 597 np.ndarray 598 The optimal policy. It can be either in the stochastic form, so in the form of dirac probability 599 distribution or as a simple mapping to integer. 600 """ 601 if stochastic_form not in self._optimal_policy: 602 self._optimal_policy[stochastic_form] = get_policy_from_q_values( 603 self.optimal_value_functions[0], stochastic_form 604 ) 605 return self._optimal_policy[stochastic_form]
Returns
- np.ndarray: The optimal policy. It can be either in the stochastic form, so in the form of dirac probability
- distribution or as a simple mapping to integer.
607 def get_worst_policy(self, stochastic_form) -> np.ndarray: 608 """ 609 Returns 610 ------- 611 np.ndarray 612 The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability 613 distribution or as a simple mapping to integer. 614 """ 615 if stochastic_form not in self._worst_policy: 616 self._worst_policy[stochastic_form] = get_policy_from_q_values( 617 self._vi(self.T, -self.R)[0], stochastic_form 618 ) 619 return self._worst_policy[stochastic_form]
Returns
- np.ndarray: The worst performing policy. It can be either in the stochastic form, so in the form of dirac probability
- distribution or as a simple mapping to integer.
621 def get_value_functions(self, policy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: 622 """ 623 Parameters 624 ---------- 625 policy : np.ndarray 626 a numpy array containing a continuous policy. 627 Returns 628 ------- 629 Q : np.ndarray 630 the state action value function. 631 V : np.ndarray 632 the state value function. 633 """ 634 return self._pe(*self.transition_matrix_and_rewards, policy)
Parameters
- policy (np.ndarray): a numpy array containing a continuous policy.
Returns
- Q (np.ndarray): the state action value function.
- V (np.ndarray): the state value function.
Returns
- Q (np.ndarray): the optimal state action value function.
- V (np.ndarray): the optimal state value function.
Returns
- Q (np.ndarray): the state action value function of the worst policy.
- V (np.ndarray): the state value function of the worst policy.
Returns
- Q (np.ndarray): the state action value function of the random uniform policy.
- V (np.ndarray): the state value function of the random uniform policy.
Returns
- np.ndarray: The transition probabilities corresponding to the optimal policy.
Returns
- np.ndarray: The transition probabilities corresponding to the worst policy.
Returns
- np.ndarray: The transition probabilities corresponding to the random uniform policy.
Returns
- MarkovChain: The Markov chain corresponding to the optimal policy.
Returns
- MarkovChain: The Markov chain corresponding to the worst policy.
Returns
- MarkovChain: The Markov chain corresponding to the random uniform policy.
768 def get_stationary_distribution(self, policy: np.ndarray) -> np.ndarray: 769 """ 770 Parameters 771 ---------- 772 policy : np.ndarray 773 a numpy array containing a continuous policy. 774 Returns 775 ------- 776 np.ndarray 777 the stationary distribution of the Markov chain yielded by the policy. 778 """ 779 return get_stationary_distribution( 780 get_transition_probabilities(self.T, policy), 781 self.starting_states_and_probs, 782 )
Parameters
- policy (np.ndarray): a numpy array containing a continuous policy.
Returns
- np.ndarray: the stationary distribution of the Markov chain yielded by the policy.
Returns
- np.ndarray: the stationary distribution of the Markov chain yielded by the optimal policy.
Returns
- np.ndarray: the stationary distribution of the Markov chain yielded by the worst policy.
Returns
- np.ndarray: the stationary distribution of the Markov chain yielded by the random uniform policy.
Returns
- np.ndarray: The average rewards obtained for each state when following the optimal policy.
Returns
- np.ndarray: The average rewards obtained for each state when following the worst policy.
Returns
- np.ndarray: The average rewards obtained for each state when following the random uniform policy.
880 def get_average_reward(self, policy: np.ndarray) -> float: 881 r""" 882 Parameters 883 ---------- 884 policy : np.ndarray 885 a numpy array containing a continuous policy. 886 Returns 887 ------- 888 average_reward : float 889 the expected time average reward, i.e. 890 :math:`\underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t`, associated with 891 the policy. 892 """ 893 sd = self.get_stationary_distribution(policy) 894 return sum(sd * get_average_rewards(self.R, policy))
Parameters
- policy (np.ndarray): a numpy array containing a continuous policy.
Returns
- average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the policy.
Returns
- average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the optimal policy.
Returns
- average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the worst policy.
Returns
- average_reward (float): the expected time average reward, i.e. \( \underset{T\to \infty}{\text{lim inf}} \frac{1}{T} \mathbb{E}_\pi \sum_{t=0}^T R_t \), associated with the random uniform policy.
Returns
- np.ndarray: The transition probabilities 3d numpy array.
- np.ndarray: The reward matrix.
Returns
- Dict["NODE_TYPE", Tuple[float, float]]: The graph layout for the MDP. It can be customized by implementing the
custom_graph_layout
function.
Returns
- Dict[str, Any]: The dictionary containing graph metrics for the graph underlying the MDP.
Returns
- float: The sum of the reciprocals of the sub-optimality gaps
Returns
- float: The environmental value norm in the undiscounted setting.
Returns
- Dict[str, float]: The dictionary containing all the measures of hardness availble.
Returns
- Dict[str, Dict[str, Any]]: The dictionary with information about the parameters, the measures of hardness and the graph metrics.
looks for a cached hardness report in the folder given in input when the MDP was created. It returns None if it is not able to find it.
1151 def get_info_class(self, n: "NODE_TYPE") -> NodeInfoClass: 1152 """ 1153 Returns 1154 ------- 1155 NodeInfoClass 1156 The container class (NodeInfoClass) associated with state n. 1157 """ 1158 return self.G.nodes[n]["info_class"]
Returns
- NodeInfoClass: The container class (NodeInfoClass) associated with state n.
1160 def get_transition_distributions( 1161 self, node: "NODE_TYPE" 1162 ) -> Dict[int, Union[rv_continuous, NextStateSampler]]: 1163 """ 1164 Returns 1165 ------- 1166 Dict[int, Union[rv_continuous, NextStateSampler]] 1167 The dictionary containing the transition distributions for any action at the state given in input. 1168 """ 1169 return self.get_info_class(node).transition_distributions
Returns
- Dict[int, Union[rv_continuous, NextStateSampler]]: The dictionary containing the transition distributions for any action at the state given in input.
1171 def get_reward_distribution( 1172 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 1173 ) -> rv_continuous: 1174 """ 1175 Returns 1176 ------- 1177 rv_continuous 1178 The reward distribution for transitioning in next_node when selecting action from state. 1179 """ 1180 if (node, action, next_node) not in self._cached_reward_distributions: 1181 self._cached_reward_distributions[ 1182 node, action, next_node 1183 ] = self._get_reward_distribution( 1184 node, self._inverse_action_mapping(node, action), next_node 1185 ) 1186 return self._cached_reward_distributions[node, action, next_node]
Returns
- rv_continuous: The reward distribution for transitioning in next_node when selecting action from state.
1188 def sample_reward( 1189 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 1190 ) -> float: 1191 """ 1192 Returns 1193 ------- 1194 float 1195 A sample from the reward distribution when transitioning in next_node when selecting action from state. 1196 """ 1197 if (node, action, next_node) not in self._cached_rewards or len( 1198 self._cached_rewards[node, action, next_node] 1199 ) == 0: 1200 self._cached_rewards[node, action, next_node] = ( 1201 self.get_reward_distribution(node, action, next_node) 1202 .rvs(5000, random_state=self._rng) 1203 .tolist() 1204 ) 1205 r = self._cached_rewards[node, action, next_node].pop(0) 1206 return ( 1207 r * (self.rewards_range[1] - self.rewards_range[0]) - self.rewards_range[0] 1208 )
Returns
- float: A sample from the reward distribution when transitioning in next_node when selecting action from state.
1210 def get_measure_from_name(self, measure_name: str) -> float: 1211 """ 1212 Parameters 1213 ---------- 1214 measure_name : str 1215 the code name for the measure. The available code names can be obtained using the 1216 ``BaseMDP.get_available_hardness_measures`` function. 1217 Returns 1218 ------- 1219 float 1220 The value for the hardness measure. 1221 """ 1222 if measure_name == "diameter": 1223 return self.diameter 1224 elif measure_name in ["value_norm", "environmental_value_norm"]: 1225 return self.value_norm 1226 elif measure_name == "suboptimal_gaps": 1227 return self.sum_reciprocals_suboptimality_gaps 1228 else: 1229 raise ValueError( 1230 f"{measure_name} is not a valid hardness measure name: available ones are " 1231 + str(self.get_available_hardness_measures()) 1232 )
Parameters
- measure_name (str):
the code name for the measure. The available code names can be obtained using the
BaseMDP.get_available_hardness_measures
function.
Returns
- float: The value for the hardness measure.
1234 def action_spec(self) -> DiscreteArray: 1235 """ 1236 Returns 1237 ------- 1238 DiscreteArray 1239 The action spec of the environment. 1240 """ 1241 return DiscreteArray(self.n_actions, name="action")
Returns
- DiscreteArray: The action spec of the environment.
1243 def observation_spec(self) -> Array: 1244 """ 1245 Returns 1246 ------- 1247 Array 1248 The observation spec of the environment. 1249 """ 1250 if self.emission_map is None: 1251 return DiscreteArray(self.n_states, name="observation") 1252 obs = self.get_observation(self.starting_nodes[0], 0) 1253 return BoundedArray(obs.shape, obs.dtype, -np.inf, np.inf, "observation")
Returns
- Array: The observation spec of the environment.
1255 def get_observation( 1256 self, node: "NODE_TYPE", h: int = None 1257 ) -> Union[int, np.ndarray]: 1258 """ 1259 Returns 1260 ------- 1261 Union[int, np.ndarray] 1262 The representation corresponding to the state given in input. Episodic MDPs also requires the current 1263 in-episode time step. 1264 """ 1265 if self.emission_map is None: 1266 return self.node_to_index[self.cur_node] 1267 return self.emission_map.get_observation(node, h)
Returns
- Union[int, np.ndarray]: The representation corresponding to the state given in input. Episodic MDPs also requires the current
- in-episode time step.
1269 def reset(self) -> dm_env.TimeStep: 1270 """ 1271 resets the environment to a newly sampled starting state. 1272 """ 1273 self.necessary_reset = False 1274 self.h = 0 1275 self.cur_node = self.last_starting_node = self._starting_node_sampler.sample() 1276 node_info_class = self.get_info_class(self.cur_node) 1277 node_info_class.update_visitation_counts() 1278 return dm_env.restart(self.get_observation(self.cur_node, self.h))
resets the environment to a newly sampled starting state.
1280 def step(self, action: int, auto_reset=False) -> dm_env.TimeStep: 1281 """ 1282 takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the 1283 at the end of the episodes. 1284 Returns 1285 ------- 1286 dm_env.TimeStep 1287 The TimeStep object containing the transition information. 1288 """ 1289 if auto_reset and self.necessary_reset: 1290 return self.reset() 1291 assert not self.necessary_reset 1292 1293 # This can be the in episode time step (episodic setting) or the total numuber of time steps (continuous setting) 1294 self.h += 1 1295 1296 # In case the action is a numpy array 1297 action = int(action) 1298 1299 # Moving the current state according to the action played 1300 old_node = self.cur_node 1301 self.cur_node = self.get_info_class(old_node).sample_next_state(action) 1302 self.last_edge = old_node, self.cur_node 1303 node_info_class = self.get_info_class(self.cur_node) 1304 node_info_class.update_visitation_counts(action) 1305 1306 # Calculate reward and observation 1307 reward = self.sample_reward(old_node, action, self.cur_node) 1308 observation = self.get_observation(self.cur_node, self.h) 1309 1310 # Wrapping the time step in a dm_env.TimeStep 1311 if self.is_episodic() and self.h >= self.H: 1312 self.necessary_reset = True 1313 if self.emission_map is None: 1314 observation = -1 1315 else: 1316 observation = np.zeros_like(self.observation_spec().generate_value()) 1317 return dm_env.termination(reward=reward, observation=observation) 1318 return dm_env.transition(reward=reward, observation=observation)
takes a step in the MDP for the given action. When auto_reset is set to True then it automatically reset the at the end of the episodes.
Returns
- dm_env.TimeStep: The TimeStep object containing the transition information.
1320 def random_steps( 1321 self, n: int, auto_reset=False 1322 ) -> List[Tuple[dm_env.TimeStep, int]]: 1323 """ 1324 takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to 1325 True than it automatically resets episodic MDPs. 1326 1327 Returns 1328 ------- 1329 dm_env.TimeStep 1330 The TimeStep object containing the transition information. 1331 int 1332 The action performed. 1333 """ 1334 1335 data = [] 1336 for _ in range(n): 1337 action = int(self._rng.randint(self.action_spec().num_values)) 1338 ts = self.step(action, auto_reset) 1339 data.append((ts, action)) 1340 return data
takes n a step with a random action and returns both the next step and the random action. If auto_reset is set to True than it automatically resets episodic MDPs.
Returns
- dm_env.TimeStep: The TimeStep object containing the transition information.
- int: The action performed.
1342 def random_step(self, auto_reset=False) -> Tuple[dm_env.TimeStep, int]: 1343 """ 1344 takes a step with a random action and returns both the next step and the random action. If auto_reset is set to 1345 True than it automatically resets episodic MDPs. 1346 1347 Returns 1348 ------- 1349 dm_env.TimeStep 1350 The TimeStep object containing the transition information. 1351 int 1352 The action performed. 1353 """ 1354 action = int(self._rng.randint(self.action_spec().num_values)) 1355 ts = self.step(action, auto_reset) 1356 return ts, action
takes a step with a random action and returns both the next step and the random action. If auto_reset is set to True than it automatically resets episodic MDPs.
Returns
- dm_env.TimeStep: The TimeStep object containing the transition information.
- int: The action performed.
1358 def get_visitation_counts( 1359 self, state_only=True 1360 ) -> Dict[Union["NODE_TYPE", Tuple["NODE_TYPE", "ACTION_TYPE"]], int]: 1361 """ 1362 when state_only is True it returns the visitation counts for the states and when it is False it returns the 1363 visitation counts for state action pairs. 1364 """ 1365 if state_only: 1366 return { 1367 node: self.get_info_class(node).state_visitation_count 1368 for node in self.G.nodes 1369 } 1370 return { 1371 (node, a): self.get_info_class(node).actions_visitation_count[a] 1372 for node in self.G.nodes 1373 for a in range(self.n_actions) 1374 }
when state_only is True it returns the visitation counts for the states and when it is False it returns the visitation counts for state action pairs.
1376 def reset_visitation_counts(self): 1377 """ 1378 resets the visitation counts to zero for all states and state action pairs. 1379 """ 1380 for node in self.G.nodes: 1381 self.get_info_class(node).state_visitation_count = 0 1382 for a in range(self.n_actions): 1383 self.get_info_class(node).actions_visitation_count[a] = 0
resets the visitation counts to zero for all states and state action pairs.
1385 def get_value_node_labels( 1386 self: Union["ContinuousMDP", "EpisodicMDP"], V: np.ndarray = None 1387 ) -> Dict["NODE_TYPE", float]: 1388 """ 1389 returns a mapping from state to state values. By default, it uses the optimal values. 1390 """ 1391 if V is None: 1392 _, V = self.optimal_value_functions 1393 else: 1394 if isinstance(self, EpisodicMDP): 1395 h, d = V.shape 1396 assert h == self.H and d == self.n_states 1397 else: 1398 assert len(V) == self.n_states 1399 return { 1400 node: np.round( 1401 V[0, self.node_to_index[node]] 1402 if self.is_episodic() 1403 else V[self.node_to_index[node]], 1404 2, 1405 ) 1406 for node in self.G.nodes 1407 }
returns a mapping from state to state values. By default, it uses the optimal values.
Inherited Members
- dm_env._environment.Environment
- reward_spec
- discount_spec
- close