colosseum.mdp.base_finite

  1import abc
  2from typing import TYPE_CHECKING, Any, Dict, List, Tuple
  3
  4import networkx as nx
  5import numpy as np
  6
  7from colosseum.dynamic_programming import discounted_policy_iteration
  8from colosseum.dynamic_programming import discounted_value_iteration
  9from colosseum.dynamic_programming import episodic_policy_evaluation
 10from colosseum.dynamic_programming import episodic_value_iteration
 11from colosseum.dynamic_programming.utils import get_policy_from_q_values
 12from colosseum.mdp import BaseMDP
 13from colosseum.mdp.utils.mdp_creation import (
 14    get_continuous_form_episodic_transition_matrix_and_rewards,
 15)
 16from colosseum.mdp.utils.mdp_creation import get_episodic_graph
 17from colosseum.mdp.utils.mdp_creation import get_episodic_transition_matrix_and_rewards
 18
 19if TYPE_CHECKING:
 20    from colosseum.mdp import NODE_TYPE
 21
 22
 23class EpisodicMDP(BaseMDP, abc.ABC):
 24    """
 25    The base class for episodic MDPs.
 26    """
 27
 28    @staticmethod
 29    def is_episodic() -> bool:
 30        return True
 31
 32    @property
 33    def H(self) -> int:
 34        """
 35        Returns
 36        -------
 37        int
 38            The episode length.
 39        """
 40        if self._H is None:
 41            self._set_time_horizon(self._input_H)
 42        return self._H
 43
 44    @property
 45    def random_policy_cf(self) -> np.ndarray:
 46        """
 47        Returns
 48        -------
 49        np.ndarray
 50            The random policy for the continuous form the episodic MDP.
 51        """
 52        if self._random_policy_cf is None:
 53            self._random_policy_cf = (
 54                np.ones(
 55                    (len(self.get_episodic_graph(True).nodes), self.n_actions),
 56                    np.float32,
 57                )
 58                / self.n_actions
 59            )
 60        return self._random_policy_cf
 61
 62    @property
 63    def random_policy(self) -> np.ndarray:
 64        """
 65        Returns
 66        -------
 67        np.ndarray
 68            The random uniform policy.
 69        """
 70        if self._random_policy is None:
 71            self._random_policy = (
 72                np.ones((self.H, self.n_states, self.n_actions), np.float32)
 73                / self.n_actions
 74            )
 75        return self._random_policy
 76
 77    def __init__(self, H: int = None, **kwargs):
 78        super(EpisodicMDP, self).__init__(**kwargs)
 79
 80        # Computing the time horizon
 81        self._input_H = H
 82        self._H = None
 83
 84        # Episodic setting specific caching variables
 85        self._reachable_states = None
 86        self._episodic_graph = dict()
 87        self._continuous_form_episodic_transition_matrix_and_rewards = None
 88        self._episodic_transition_matrix_and_rewards = None
 89        self._optimal_policy_cf = dict()
 90        self._worst_policy_cf = dict()
 91        self._optimal_value_cf = None
 92        self._worst_value_cf = None
 93        self._random_value_cf = None
 94        self._eoar = None
 95        self._woar = None
 96        self._roar = None
 97        self._random_policy_cf = None
 98        self._random_policy = None
 99        self._average_optimal_episodic_reward = None
100        self._average_worst_episodic_reward = None
101        self._average_random_episodic_reward = None
102
103    def _set_time_horizon(self, H: int) -> int:
104        """
105        sets a meaningful minimal horizon for the MDP.
106        """
107        if "Taxi" in str(type(self)):
108            # it is complicated to give the same horizon to different seed of the same MDP instance
109            # for the Taxi MDP
110            minimal_H = int(1.5 * self._size ** 2)
111        else:
112            minimal_H = (
113                max(
114                    max(nx.shortest_path_length(self.G, sn).values())
115                    for sn in self._possible_starting_nodes
116                )
117                + 1
118            )
119        if H is None:
120            self._H = self._H = minimal_H
121        else:
122            self._H = self._H = max(minimal_H, H)
123
124    def _vi(self, *args):
125        return episodic_value_iteration(self.H, *args)
126
127    def _pe(self, *args):
128        return episodic_policy_evaluation(self.H, *args)
129
130    @property
131    def parameters(self) -> Dict[str, Any]:
132        parameters = super(EpisodicMDP, self).parameters
133        if not self._exclude_horizon_from_parameters:
134            parameters["H"] = self.H
135        return parameters
136
137    @property
138    def reachable_states(self) -> List[Tuple[int, "NODE_TYPE"]]:
139        """
140        Returns
141        -------
142        List[Tuple[int, "NODE_TYPE"]]
143            The pairs of in episode time step and states that is possible to reach with the given episode time.
144        """
145        if self._reachable_states is None:
146            self._reachable_states = [
147                (h, self.node_to_index[n])
148                for h, n in self.get_episodic_graph(False).nodes
149            ]
150        return self._reachable_states
151
152    @property
153    def T_cf(self) -> np.ndarray:
154        """
155        is an alias for the continuous form of the transition matrix.
156        """
157        return self.continuous_form_episodic_transition_matrix_and_rewards[0]
158
159    @property
160    def R_cf(self) -> np.ndarray:
161        """
162        is an alias for the continuous form of the rewards matrix.
163        """
164        return self.continuous_form_episodic_transition_matrix_and_rewards[1]
165
166    @property
167    def optimal_value_continuous_form(self) -> Tuple[np.ndarray, np.ndarray]:
168        """
169        Returns
170        -------
171        np.ndarray
172            The q-value function of the optimal policy for the continuous form of the MDP.
173        np.ndarray
174            The state-value function of the optimal policy for the continuous form of the MDP.
175        """
176        if self._optimal_value_cf is None:
177            self._optimal_value_cf = discounted_value_iteration(self.T_cf, self.R_cf)
178        return self._optimal_value_cf
179
180    @property
181    def worst_value_continuous_form(self) -> np.ndarray:
182        """
183        Returns
184        -------
185        np.ndarray
186            The q-value function of the worst policy for the continuous form of the MDP.
187        np.ndarray
188            The state-value function of the worst policy for the continuous form of the MDP.
189        """
190        if self._worst_value_cf is None:
191            self._worst_value_cf = discounted_value_iteration(self.T_cf, -self.R_cf)
192        return self._worst_value_cf
193
194    @property
195    def random_value_continuous_form(self):
196        """
197        Returns
198        -------
199        np.ndarray
200            The q-value function of the random uniform policy for the continuous form of the MDP.
201        np.ndarray
202            The state-value function of the random uniform policy for the continuous form of the MDP.
203        """
204        if self._random_value_cf is None:
205            self._random_value_cf = discounted_policy_iteration(
206                self.T_cf, self.R_cf, self.random_policy_cf
207            )
208        return self._random_value_cf
209
210    @property
211    def episodic_optimal_average_reward(self) -> float:
212        """
213        Returns
214        -------
215        float
216            The average episodic reward for the optimal policy.
217        """
218        if self._eoar is None:
219            _eoar = 0.0
220            for sn, p in self._starting_node_sampler.next_nodes_and_probs:
221                _eoar += p * self.get_optimal_policy_starting_value(sn)
222            self._eoar = _eoar / self.H
223        return self._eoar
224
225    @property
226    def episodic_worst_average_reward(self) -> float:
227        """
228        Returns
229        -------
230        float
231            The average episodic reward for the worst policy.
232        """
233        if self._woar is None:
234            _woar = 0.0
235            for sn, p in self._starting_node_sampler.next_nodes_and_probs:
236                _woar += p * self.get_worst_policy_starting_value(sn)
237            self._woar = _woar / self.H
238        return self._woar
239
240    @property
241    def episodic_random_average_reward(self) -> float:
242        """
243        Returns
244        -------
245        float
246            The average episodic reward for the random uniform policy.
247        """
248        if self._roar is None:
249            _roar = 0.0
250            for sn, p in self._starting_node_sampler.next_nodes_and_probs:
251                _roar += p * self.get_random_policy_starting_value(sn)
252            self._roar = _roar / self.H
253        return self._roar
254
255    @property
256    def continuous_form_episodic_transition_matrix_and_rewards(
257        self,
258    ) -> Tuple[np.ndarray, np.ndarray]:
259        """
260        Returns
261        -------
262        np.ndarray
263            The transition 3d array for the continuous form of the MDP.
264        np.ndarray
265            The reward matrix for the continuous form of the MDP.
266        """
267        if self._continuous_form_episodic_transition_matrix_and_rewards is None:
268            self._continuous_form_episodic_transition_matrix_and_rewards = (
269                get_continuous_form_episodic_transition_matrix_and_rewards(
270                    self.H,
271                    self.get_episodic_graph(True),
272                    *self.transition_matrix_and_rewards,
273                    self._starting_node_sampler,
274                    self.node_to_index,
275                )
276            )
277        return self._continuous_form_episodic_transition_matrix_and_rewards
278
279    @property
280    def episodic_transition_matrix_and_rewards(self) -> Tuple[np.ndarray, np.ndarray]:
281        """
282        Returns
283        -------
284        np.ndarray
285            The transition 3d array for the MDP.
286        np.ndarray
287            The reward matrix for the MDP.
288        """
289        if self._episodic_transition_matrix_and_rewards is None:
290            self._episodic_transition_matrix_and_rewards = (
291                get_episodic_transition_matrix_and_rewards(
292                    self.H,
293                    *self.transition_matrix_and_rewards,
294                    self._starting_node_sampler,
295                    self.node_to_index,
296                )
297            )
298        return self._episodic_transition_matrix_and_rewards
299
300    def get_optimal_policy_continuous_form(self, stochastic_form: bool) -> np.ndarray:
301        """
302        Returns
303        ------
304        np.ndarray
305            The optimal policy computed for the continuous form.
306        """
307        if stochastic_form not in self._optimal_policy_cf:
308            self._optimal_policy_cf[stochastic_form] = get_policy_from_q_values(
309                self.optimal_value_continuous_form[0], stochastic_form
310            )
311        return self._optimal_policy_cf[stochastic_form]
312
313    def get_worst_policy_continuous_form(self, stochastic_form) -> np.ndarray:
314        """
315        Returns
316        ------
317        np.ndarray
318            The worst policy computed for the continuous form.
319        """
320        if stochastic_form not in self._worst_policy_cf:
321            self._worst_policy_cf[stochastic_form] = get_policy_from_q_values(
322                self.worst_value_continuous_form[0], stochastic_form
323            )
324        return self._worst_policy_cf[stochastic_form]
325
326    def get_random_policy_continuous_form(self, stochastic_form) -> np.ndarray:
327        """
328        Returns
329        ------
330        np.ndarray
331            The random uniform policy computed for the continuous form.
332        """
333        if stochastic_form not in self._worst_policy_cf:
334            self._random_policy_cf[stochastic_form] = get_policy_from_q_values(
335                self.random_value_continuous_form[0], stochastic_form
336            )
337        return self._random_policy_cf[stochastic_form]
338
339    def get_minimal_regret_for_starting_node(self, node: "NODE_TYPE") -> float:
340        """
341        Returns
342        -------
343        float
344            The minimal possible regret obtained from the given starting state.
345        """
346        return self.get_optimal_policy_starting_value(
347            node
348        ) - self.get_worst_policy_starting_value(node)
349
350    def get_optimal_policy_starting_value(self, node: "NODE_TYPE") -> float:
351        """
352        Returns
353        -------
354        float
355            The value of the given state at in episode time step zero for the optimal policy.
356        """
357        return self.optimal_value_functions[1][0, self.node_to_index[node]]
358
359    def get_worst_policy_starting_value(self, node: "NODE_TYPE"):
360        """
361        Returns
362        -------
363        float
364            The value of the given state at in episode time step zero for the worst policy.
365        """
366        return self.worst_value_functions[1][0, self.node_to_index[node]]
367
368    def get_random_policy_starting_value(self, node: "NODE_TYPE"):
369        """
370        Returns
371        -------
372        float
373            The value of the given state at in episode time step zero for the random uniform policy.
374        """
375        return self.random_value_functions[1][0, self.node_to_index[node]]
376
377    def get_episodic_graph(self, remove_labels: bool) -> nx.DiGraph:
378        """
379        Returns
380        -------
381        The graph corresponding the state space augmented with the in episode time step. It is possible to remove
382        the labels that mark the nodes.
383        """
384        if remove_labels not in self._episodic_graph:
385            self._episodic_graph[remove_labels] = get_episodic_graph(
386                self.G, self.H, self.node_to_index, self.starting_nodes, remove_labels
387            )
388        return self._episodic_graph[remove_labels]
389
390    def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.array:
391        if h is None:
392            h = self.h
393        grid = self._get_grid_representation(node)
394        while grid.shape[1] < 2 + len(str(self.h)):
395            adder = np.zeros((grid.shape[1], 1), dtype=str)
396            adder[:] = "X"
397            grid = np.hstack((grid, adder))
398        title = np.array(
399            [" "] * grid.shape[1] + ["_"] * grid.shape[1], dtype=str
400        ).reshape(2, -1)
401        title[0, 0] = "H"
402        title[0, 1] = "="
403        for i, l in enumerate(str(h)):
404            title[0, 2 + i] = l
405        return np.vstack((title, grid))
class EpisodicMDP(colosseum.mdp.base.BaseMDP, abc.ABC):
 24class EpisodicMDP(BaseMDP, abc.ABC):
 25    """
 26    The base class for episodic MDPs.
 27    """
 28
 29    @staticmethod
 30    def is_episodic() -> bool:
 31        return True
 32
 33    @property
 34    def H(self) -> int:
 35        """
 36        Returns
 37        -------
 38        int
 39            The episode length.
 40        """
 41        if self._H is None:
 42            self._set_time_horizon(self._input_H)
 43        return self._H
 44
 45    @property
 46    def random_policy_cf(self) -> np.ndarray:
 47        """
 48        Returns
 49        -------
 50        np.ndarray
 51            The random policy for the continuous form the episodic MDP.
 52        """
 53        if self._random_policy_cf is None:
 54            self._random_policy_cf = (
 55                np.ones(
 56                    (len(self.get_episodic_graph(True).nodes), self.n_actions),
 57                    np.float32,
 58                )
 59                / self.n_actions
 60            )
 61        return self._random_policy_cf
 62
 63    @property
 64    def random_policy(self) -> np.ndarray:
 65        """
 66        Returns
 67        -------
 68        np.ndarray
 69            The random uniform policy.
 70        """
 71        if self._random_policy is None:
 72            self._random_policy = (
 73                np.ones((self.H, self.n_states, self.n_actions), np.float32)
 74                / self.n_actions
 75            )
 76        return self._random_policy
 77
 78    def __init__(self, H: int = None, **kwargs):
 79        super(EpisodicMDP, self).__init__(**kwargs)
 80
 81        # Computing the time horizon
 82        self._input_H = H
 83        self._H = None
 84
 85        # Episodic setting specific caching variables
 86        self._reachable_states = None
 87        self._episodic_graph = dict()
 88        self._continuous_form_episodic_transition_matrix_and_rewards = None
 89        self._episodic_transition_matrix_and_rewards = None
 90        self._optimal_policy_cf = dict()
 91        self._worst_policy_cf = dict()
 92        self._optimal_value_cf = None
 93        self._worst_value_cf = None
 94        self._random_value_cf = None
 95        self._eoar = None
 96        self._woar = None
 97        self._roar = None
 98        self._random_policy_cf = None
 99        self._random_policy = None
100        self._average_optimal_episodic_reward = None
101        self._average_worst_episodic_reward = None
102        self._average_random_episodic_reward = None
103
104    def _set_time_horizon(self, H: int) -> int:
105        """
106        sets a meaningful minimal horizon for the MDP.
107        """
108        if "Taxi" in str(type(self)):
109            # it is complicated to give the same horizon to different seed of the same MDP instance
110            # for the Taxi MDP
111            minimal_H = int(1.5 * self._size ** 2)
112        else:
113            minimal_H = (
114                max(
115                    max(nx.shortest_path_length(self.G, sn).values())
116                    for sn in self._possible_starting_nodes
117                )
118                + 1
119            )
120        if H is None:
121            self._H = self._H = minimal_H
122        else:
123            self._H = self._H = max(minimal_H, H)
124
125    def _vi(self, *args):
126        return episodic_value_iteration(self.H, *args)
127
128    def _pe(self, *args):
129        return episodic_policy_evaluation(self.H, *args)
130
131    @property
132    def parameters(self) -> Dict[str, Any]:
133        parameters = super(EpisodicMDP, self).parameters
134        if not self._exclude_horizon_from_parameters:
135            parameters["H"] = self.H
136        return parameters
137
138    @property
139    def reachable_states(self) -> List[Tuple[int, "NODE_TYPE"]]:
140        """
141        Returns
142        -------
143        List[Tuple[int, "NODE_TYPE"]]
144            The pairs of in episode time step and states that is possible to reach with the given episode time.
145        """
146        if self._reachable_states is None:
147            self._reachable_states = [
148                (h, self.node_to_index[n])
149                for h, n in self.get_episodic_graph(False).nodes
150            ]
151        return self._reachable_states
152
153    @property
154    def T_cf(self) -> np.ndarray:
155        """
156        is an alias for the continuous form of the transition matrix.
157        """
158        return self.continuous_form_episodic_transition_matrix_and_rewards[0]
159
160    @property
161    def R_cf(self) -> np.ndarray:
162        """
163        is an alias for the continuous form of the rewards matrix.
164        """
165        return self.continuous_form_episodic_transition_matrix_and_rewards[1]
166
167    @property
168    def optimal_value_continuous_form(self) -> Tuple[np.ndarray, np.ndarray]:
169        """
170        Returns
171        -------
172        np.ndarray
173            The q-value function of the optimal policy for the continuous form of the MDP.
174        np.ndarray
175            The state-value function of the optimal policy for the continuous form of the MDP.
176        """
177        if self._optimal_value_cf is None:
178            self._optimal_value_cf = discounted_value_iteration(self.T_cf, self.R_cf)
179        return self._optimal_value_cf
180
181    @property
182    def worst_value_continuous_form(self) -> np.ndarray:
183        """
184        Returns
185        -------
186        np.ndarray
187            The q-value function of the worst policy for the continuous form of the MDP.
188        np.ndarray
189            The state-value function of the worst policy for the continuous form of the MDP.
190        """
191        if self._worst_value_cf is None:
192            self._worst_value_cf = discounted_value_iteration(self.T_cf, -self.R_cf)
193        return self._worst_value_cf
194
195    @property
196    def random_value_continuous_form(self):
197        """
198        Returns
199        -------
200        np.ndarray
201            The q-value function of the random uniform policy for the continuous form of the MDP.
202        np.ndarray
203            The state-value function of the random uniform policy for the continuous form of the MDP.
204        """
205        if self._random_value_cf is None:
206            self._random_value_cf = discounted_policy_iteration(
207                self.T_cf, self.R_cf, self.random_policy_cf
208            )
209        return self._random_value_cf
210
211    @property
212    def episodic_optimal_average_reward(self) -> float:
213        """
214        Returns
215        -------
216        float
217            The average episodic reward for the optimal policy.
218        """
219        if self._eoar is None:
220            _eoar = 0.0
221            for sn, p in self._starting_node_sampler.next_nodes_and_probs:
222                _eoar += p * self.get_optimal_policy_starting_value(sn)
223            self._eoar = _eoar / self.H
224        return self._eoar
225
226    @property
227    def episodic_worst_average_reward(self) -> float:
228        """
229        Returns
230        -------
231        float
232            The average episodic reward for the worst policy.
233        """
234        if self._woar is None:
235            _woar = 0.0
236            for sn, p in self._starting_node_sampler.next_nodes_and_probs:
237                _woar += p * self.get_worst_policy_starting_value(sn)
238            self._woar = _woar / self.H
239        return self._woar
240
241    @property
242    def episodic_random_average_reward(self) -> float:
243        """
244        Returns
245        -------
246        float
247            The average episodic reward for the random uniform policy.
248        """
249        if self._roar is None:
250            _roar = 0.0
251            for sn, p in self._starting_node_sampler.next_nodes_and_probs:
252                _roar += p * self.get_random_policy_starting_value(sn)
253            self._roar = _roar / self.H
254        return self._roar
255
256    @property
257    def continuous_form_episodic_transition_matrix_and_rewards(
258        self,
259    ) -> Tuple[np.ndarray, np.ndarray]:
260        """
261        Returns
262        -------
263        np.ndarray
264            The transition 3d array for the continuous form of the MDP.
265        np.ndarray
266            The reward matrix for the continuous form of the MDP.
267        """
268        if self._continuous_form_episodic_transition_matrix_and_rewards is None:
269            self._continuous_form_episodic_transition_matrix_and_rewards = (
270                get_continuous_form_episodic_transition_matrix_and_rewards(
271                    self.H,
272                    self.get_episodic_graph(True),
273                    *self.transition_matrix_and_rewards,
274                    self._starting_node_sampler,
275                    self.node_to_index,
276                )
277            )
278        return self._continuous_form_episodic_transition_matrix_and_rewards
279
280    @property
281    def episodic_transition_matrix_and_rewards(self) -> Tuple[np.ndarray, np.ndarray]:
282        """
283        Returns
284        -------
285        np.ndarray
286            The transition 3d array for the MDP.
287        np.ndarray
288            The reward matrix for the MDP.
289        """
290        if self._episodic_transition_matrix_and_rewards is None:
291            self._episodic_transition_matrix_and_rewards = (
292                get_episodic_transition_matrix_and_rewards(
293                    self.H,
294                    *self.transition_matrix_and_rewards,
295                    self._starting_node_sampler,
296                    self.node_to_index,
297                )
298            )
299        return self._episodic_transition_matrix_and_rewards
300
301    def get_optimal_policy_continuous_form(self, stochastic_form: bool) -> np.ndarray:
302        """
303        Returns
304        ------
305        np.ndarray
306            The optimal policy computed for the continuous form.
307        """
308        if stochastic_form not in self._optimal_policy_cf:
309            self._optimal_policy_cf[stochastic_form] = get_policy_from_q_values(
310                self.optimal_value_continuous_form[0], stochastic_form
311            )
312        return self._optimal_policy_cf[stochastic_form]
313
314    def get_worst_policy_continuous_form(self, stochastic_form) -> np.ndarray:
315        """
316        Returns
317        ------
318        np.ndarray
319            The worst policy computed for the continuous form.
320        """
321        if stochastic_form not in self._worst_policy_cf:
322            self._worst_policy_cf[stochastic_form] = get_policy_from_q_values(
323                self.worst_value_continuous_form[0], stochastic_form
324            )
325        return self._worst_policy_cf[stochastic_form]
326
327    def get_random_policy_continuous_form(self, stochastic_form) -> np.ndarray:
328        """
329        Returns
330        ------
331        np.ndarray
332            The random uniform policy computed for the continuous form.
333        """
334        if stochastic_form not in self._worst_policy_cf:
335            self._random_policy_cf[stochastic_form] = get_policy_from_q_values(
336                self.random_value_continuous_form[0], stochastic_form
337            )
338        return self._random_policy_cf[stochastic_form]
339
340    def get_minimal_regret_for_starting_node(self, node: "NODE_TYPE") -> float:
341        """
342        Returns
343        -------
344        float
345            The minimal possible regret obtained from the given starting state.
346        """
347        return self.get_optimal_policy_starting_value(
348            node
349        ) - self.get_worst_policy_starting_value(node)
350
351    def get_optimal_policy_starting_value(self, node: "NODE_TYPE") -> float:
352        """
353        Returns
354        -------
355        float
356            The value of the given state at in episode time step zero for the optimal policy.
357        """
358        return self.optimal_value_functions[1][0, self.node_to_index[node]]
359
360    def get_worst_policy_starting_value(self, node: "NODE_TYPE"):
361        """
362        Returns
363        -------
364        float
365            The value of the given state at in episode time step zero for the worst policy.
366        """
367        return self.worst_value_functions[1][0, self.node_to_index[node]]
368
369    def get_random_policy_starting_value(self, node: "NODE_TYPE"):
370        """
371        Returns
372        -------
373        float
374            The value of the given state at in episode time step zero for the random uniform policy.
375        """
376        return self.random_value_functions[1][0, self.node_to_index[node]]
377
378    def get_episodic_graph(self, remove_labels: bool) -> nx.DiGraph:
379        """
380        Returns
381        -------
382        The graph corresponding the state space augmented with the in episode time step. It is possible to remove
383        the labels that mark the nodes.
384        """
385        if remove_labels not in self._episodic_graph:
386            self._episodic_graph[remove_labels] = get_episodic_graph(
387                self.G, self.H, self.node_to_index, self.starting_nodes, remove_labels
388            )
389        return self._episodic_graph[remove_labels]
390
391    def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.array:
392        if h is None:
393            h = self.h
394        grid = self._get_grid_representation(node)
395        while grid.shape[1] < 2 + len(str(self.h)):
396            adder = np.zeros((grid.shape[1], 1), dtype=str)
397            adder[:] = "X"
398            grid = np.hstack((grid, adder))
399        title = np.array(
400            [" "] * grid.shape[1] + ["_"] * grid.shape[1], dtype=str
401        ).reshape(2, -1)
402        title[0, 0] = "H"
403        title[0, 1] = "="
404        for i, l in enumerate(str(h)):
405            title[0, 2 + i] = l
406        return np.vstack((title, grid))

The base class for episodic MDPs.

@staticmethod
def is_episodic() -> bool:
29    @staticmethod
30    def is_episodic() -> bool:
31        return True
Returns
  • bool: True if the MDP is episodic.
H: int
Returns
  • int: The episode length.
random_policy_cf: numpy.ndarray
Returns
  • np.ndarray: The random policy for the continuous form the episodic MDP.
random_policy: numpy.ndarray
Returns
  • np.ndarray: The random uniform policy.
parameters: Dict[str, Any]
Returns
  • Dict[str, Any]: The parameters of the MDP.
T_cf: numpy.ndarray

is an alias for the continuous form of the transition matrix.

R_cf: numpy.ndarray

is an alias for the continuous form of the rewards matrix.

optimal_value_continuous_form: Tuple[numpy.ndarray, numpy.ndarray]
Returns
  • np.ndarray: The q-value function of the optimal policy for the continuous form of the MDP.
  • np.ndarray: The state-value function of the optimal policy for the continuous form of the MDP.
worst_value_continuous_form: numpy.ndarray
Returns
  • np.ndarray: The q-value function of the worst policy for the continuous form of the MDP.
  • np.ndarray: The state-value function of the worst policy for the continuous form of the MDP.
random_value_continuous_form
Returns
  • np.ndarray: The q-value function of the random uniform policy for the continuous form of the MDP.
  • np.ndarray: The state-value function of the random uniform policy for the continuous form of the MDP.
episodic_optimal_average_reward: float
Returns
  • float: The average episodic reward for the optimal policy.
episodic_worst_average_reward: float
Returns
  • float: The average episodic reward for the worst policy.
episodic_random_average_reward: float
Returns
  • float: The average episodic reward for the random uniform policy.
continuous_form_episodic_transition_matrix_and_rewards: Tuple[numpy.ndarray, numpy.ndarray]
Returns
  • np.ndarray: The transition 3d array for the continuous form of the MDP.
  • np.ndarray: The reward matrix for the continuous form of the MDP.
episodic_transition_matrix_and_rewards: Tuple[numpy.ndarray, numpy.ndarray]
Returns
  • np.ndarray: The transition 3d array for the MDP.
  • np.ndarray: The reward matrix for the MDP.
def get_optimal_policy_continuous_form(self, stochastic_form: bool) -> numpy.ndarray:
301    def get_optimal_policy_continuous_form(self, stochastic_form: bool) -> np.ndarray:
302        """
303        Returns
304        ------
305        np.ndarray
306            The optimal policy computed for the continuous form.
307        """
308        if stochastic_form not in self._optimal_policy_cf:
309            self._optimal_policy_cf[stochastic_form] = get_policy_from_q_values(
310                self.optimal_value_continuous_form[0], stochastic_form
311            )
312        return self._optimal_policy_cf[stochastic_form]
Returns
  • np.ndarray: The optimal policy computed for the continuous form.
def get_worst_policy_continuous_form(self, stochastic_form) -> numpy.ndarray:
314    def get_worst_policy_continuous_form(self, stochastic_form) -> np.ndarray:
315        """
316        Returns
317        ------
318        np.ndarray
319            The worst policy computed for the continuous form.
320        """
321        if stochastic_form not in self._worst_policy_cf:
322            self._worst_policy_cf[stochastic_form] = get_policy_from_q_values(
323                self.worst_value_continuous_form[0], stochastic_form
324            )
325        return self._worst_policy_cf[stochastic_form]
Returns
  • np.ndarray: The worst policy computed for the continuous form.
def get_random_policy_continuous_form(self, stochastic_form) -> numpy.ndarray:
327    def get_random_policy_continuous_form(self, stochastic_form) -> np.ndarray:
328        """
329        Returns
330        ------
331        np.ndarray
332            The random uniform policy computed for the continuous form.
333        """
334        if stochastic_form not in self._worst_policy_cf:
335            self._random_policy_cf[stochastic_form] = get_policy_from_q_values(
336                self.random_value_continuous_form[0], stochastic_form
337            )
338        return self._random_policy_cf[stochastic_form]
Returns
  • np.ndarray: The random uniform policy computed for the continuous form.
340    def get_minimal_regret_for_starting_node(self, node: "NODE_TYPE") -> float:
341        """
342        Returns
343        -------
344        float
345            The minimal possible regret obtained from the given starting state.
346        """
347        return self.get_optimal_policy_starting_value(
348            node
349        ) - self.get_worst_policy_starting_value(node)
Returns
  • float: The minimal possible regret obtained from the given starting state.
351    def get_optimal_policy_starting_value(self, node: "NODE_TYPE") -> float:
352        """
353        Returns
354        -------
355        float
356            The value of the given state at in episode time step zero for the optimal policy.
357        """
358        return self.optimal_value_functions[1][0, self.node_to_index[node]]
Returns
  • float: The value of the given state at in episode time step zero for the optimal policy.
360    def get_worst_policy_starting_value(self, node: "NODE_TYPE"):
361        """
362        Returns
363        -------
364        float
365            The value of the given state at in episode time step zero for the worst policy.
366        """
367        return self.worst_value_functions[1][0, self.node_to_index[node]]
Returns
  • float: The value of the given state at in episode time step zero for the worst policy.
369    def get_random_policy_starting_value(self, node: "NODE_TYPE"):
370        """
371        Returns
372        -------
373        float
374            The value of the given state at in episode time step zero for the random uniform policy.
375        """
376        return self.random_value_functions[1][0, self.node_to_index[node]]
Returns
  • float: The value of the given state at in episode time step zero for the random uniform policy.
def get_episodic_graph(self, remove_labels: bool) -> networkx.classes.digraph.DiGraph:
378    def get_episodic_graph(self, remove_labels: bool) -> nx.DiGraph:
379        """
380        Returns
381        -------
382        The graph corresponding the state space augmented with the in episode time step. It is possible to remove
383        the labels that mark the nodes.
384        """
385        if remove_labels not in self._episodic_graph:
386            self._episodic_graph[remove_labels] = get_episodic_graph(
387                self.G, self.H, self.node_to_index, self.starting_nodes, remove_labels
388            )
389        return self._episodic_graph[remove_labels]
Returns
  • The graph corresponding the state space augmented with the in episode time step. It is possible to remove
  • the labels that mark the nodes.
391    def get_grid_representation(self, node: "NODE_TYPE", h: int = None) -> np.array:
392        if h is None:
393            h = self.h
394        grid = self._get_grid_representation(node)
395        while grid.shape[1] < 2 + len(str(self.h)):
396            adder = np.zeros((grid.shape[1], 1), dtype=str)
397            adder[:] = "X"
398            grid = np.hstack((grid, adder))
399        title = np.array(
400            [" "] * grid.shape[1] + ["_"] * grid.shape[1], dtype=str
401        ).reshape(2, -1)
402        title[0, 0] = "H"
403        title[0, 1] = "="
404        for i, l in enumerate(str(h)):
405            title[0, 2 + i] = l
406        return np.vstack((title, grid))
Returns
  • np.ndarray: An ASCII representation of the state given in input stored as numpy array.