colosseum.mdp.minigrid_empty.base

  1import abc
  2from dataclasses import dataclass
  3from enum import IntEnum
  4from itertools import product
  5from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Type, Union
  6
  7import numpy as np
  8from scipy.stats import beta, rv_continuous
  9
 10from colosseum.mdp import BaseMDP
 11from colosseum.mdp.utils.custom_samplers import NextStateSampler
 12from colosseum.utils.miscellanea import (
 13    check_distributions,
 14    deterministic,
 15    get_dist,
 16    rounding_nested_structure,
 17)
 18
 19if TYPE_CHECKING:
 20    from colosseum.mdp import ACTION_TYPE, NODE_TYPE
 21
 22
 23class MiniGridEmptyAction(IntEnum):
 24    """The action available in the MiniGridEmpty MDP."""
 25
 26    MoveForward = 0
 27    """Move the agent forward."""
 28    TurnRight = 1
 29    """Turn the agent towards the right."""
 30    TurnLeft = 2
 31    """Turn the agent towards the left."""
 32
 33
 34class MiniGridEmptyDirection(IntEnum):
 35    """
 36    The actions available in the MiniGridEmpty MDP.
 37    """
 38
 39    UP = 0
 40    RIGHT = 1
 41    DOWN = 2
 42    LEFT = 3
 43
 44
 45@dataclass(frozen=True)
 46class MiniGridEmptyNode:
 47    """
 48    The node for the MiniGridEmpty MDP.
 49    """
 50
 51    X: int
 52    """x coordinate."""
 53    Y: int
 54    """y coordinate."""
 55    Dir: MiniGridEmptyDirection
 56    """The direction the agent is facing."""
 57
 58    def __str__(self):
 59        return f"X={self.X},Y={self.Y},Dir={self.Dir.name}"
 60
 61
 62class MiniGridEmptyMDP(BaseMDP, abc.ABC):
 63    """
 64    The base class for the MiniGridEmpty family.
 65    """
 66
 67    @staticmethod
 68    def get_unique_symbols() -> List[str]:
 69        return [" ", ">", "<", "v", "^", "G"]
 70
 71    @staticmethod
 72    def does_seed_change_MDP_structure() -> bool:
 73        return True
 74
 75    @staticmethod
 76    def sample_mdp_parameters(
 77        n: int, is_episodic: bool, seed: int = None
 78    ) -> List[Dict[str, Any]]:
 79        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 80        samples = []
 81        for _ in range(n):
 82            p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5])
 83            sample = dict(
 84                size=int(np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20))
 85                if is_episodic
 86                else int(1.5 * np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)),
 87                n_starting_states=rng.randint(1, 5),
 88                p_rand=p_rand,
 89                p_lazy=p_lazy,
 90                make_reward_stochastic=rng.choice([True, False]),
 91                reward_variance_multiplier=2 * rng.random() + 0.005,
 92            )
 93            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
 94            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
 95
 96            if sample["make_reward_stochastic"]:
 97                sample["optimal_distribution"] = (
 98                    "beta",
 99                    (
100                        sample["reward_variance_multiplier"],
101                        sample["reward_variance_multiplier"]
102                        * (sample["size"] ** 2 - 1),
103                    ),
104                )
105                sample["other_distribution"] = (
106                    "beta",
107                    (
108                        sample["reward_variance_multiplier"]
109                        * (sample["size"] ** 2 - 1),
110                        sample["reward_variance_multiplier"],
111                    ),
112                )
113            else:
114                sample["optimal_distribution"] = ("deterministic", (1.0,))
115                sample["other_distribution"] = ("deterministic", (0.0,))
116
117            samples.append(rounding_nested_structure(sample))
118        return samples
119
120    @staticmethod
121    def get_node_class() -> Type["NODE_TYPE"]:
122        return MiniGridEmptyNode
123
124    def get_gin_parameters(self, index: int) -> str:
125        prms = dict(
126            size=self._size,
127            n_starting_states=self._n_starting_states,
128            make_reward_stochastic=self._make_reward_stochastic,
129            reward_variance_multiplier=self._reward_variance_multiplier,
130            optimal_distribution=(
131                self._optimal_distribution.dist.name,
132                self._optimal_distribution.args,
133            ),
134            other_distribution=(
135                self._other_distribution.dist.name,
136                self._other_distribution.args,
137            ),
138        )
139
140        if self._p_rand is not None:
141            prms["p_rand"] = self._p_rand
142        if self._p_lazy is not None:
143            prms["p_lazy"] = self._p_lazy
144
145        return MiniGridEmptyMDP.produce_gin_file_from_mdp_parameters(
146            prms, type(self).__name__, index
147        )
148
149    @property
150    def n_actions(self) -> int:
151        return len(MiniGridEmptyAction)
152
153    def _get_next_nodes_parameters(
154        self, node: "NODE_TYPE", action: "ACTION_TYPE"
155    ) -> Tuple[Tuple[dict, float], ...]:
156        d = node.Dir
157        if action == MiniGridEmptyAction.TurnRight:
158            return (
159                (
160                    dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d + 1) % 4)),
161                    1.0,
162                ),
163            )
164        if action == MiniGridEmptyAction.TurnLeft:
165            return (
166                (
167                    dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d - 1) % 4)),
168                    1.0,
169                ),
170            )
171        if action == MiniGridEmptyAction.MoveForward:
172            if d == MiniGridEmptyDirection.UP:
173                return (
174                    (dict(X=node.X, Y=min(node.Y + 1, self._size - 1), Dir=d), 1.0),
175                )
176            if d == MiniGridEmptyDirection.RIGHT:
177                return (
178                    (dict(X=min(self._size - 1, node.X + 1), Y=node.Y, Dir=d), 1.0),
179                )
180            if d == MiniGridEmptyDirection.DOWN:
181                return ((dict(X=node.X, Y=max(node.Y - 1, 0), Dir=d), 1.0),)
182            if d == MiniGridEmptyDirection.LEFT:
183                return ((dict(X=max(0, node.X - 1), Y=node.Y, Dir=d), 1.0),)
184
185    def _get_reward_distribution(
186        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
187    ) -> rv_continuous:
188        return (
189            self._optimal_distribution
190            if next_node.X == self.goal_position[0]
191            and next_node.Y == self.goal_position[1]
192            else self._other_distribution
193        )
194
195    def get_positions_on_side(self, side: int) -> List[Tuple[int, int]]:
196        nodes = []
197        for i in range(self._size):
198            for j in range(self._size):
199                if side == 0:  # Starting from the left
200                    nodes.append((i, j))
201                elif side == 1:  # Starting from the south
202                    nodes.append((j, i))
203                elif side == 2:  # Starting from the right
204                    nodes.append((self._size - 1 - i, self._size - 1 - j))
205                else:  # Starting from the north
206                    nodes.append((self._size - 1 - j, self._size - 1 - i))
207                # if len(nodes) == N:
208                #     return nodes
209        return nodes
210
211    @property
212    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
213        return [
214            MiniGridEmptyNode(x, y, MiniGridEmptyDirection(d))
215            for (x, y), d in product(self.__possible_starting_nodes, range(4))
216        ]
217
218    def _get_starting_node_sampler(self) -> NextStateSampler:
219        self.side_start = self._rng.randint(4)
220        self.goal_position = self.get_positions_on_side((self.side_start + 2) % 4)[
221            : self._size
222        ][self._rng.randint(self._size)]
223        self.__possible_starting_nodes = self.get_positions_on_side(self.side_start)[
224            : self._size
225        ]
226        self._rng.shuffle(self.__possible_starting_nodes)
227        starting_nodes = self.__possible_starting_nodes[: self._n_starting_states]
228        return NextStateSampler(
229            next_nodes=[
230                MiniGridEmptyNode(x, y, MiniGridEmptyDirection(self._rng.randint(4)))
231                for x, y in starting_nodes
232            ],
233            probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))],
234            seed=self._produce_random_seed(),
235        )
236
237    def _check_parameters_in_input(self):
238        super(MiniGridEmptyMDP, self)._check_parameters_in_input()
239
240        assert self._size > 2, f"the size should be greater than 2"
241        assert self._n_starting_states > 0
242
243        dists = [
244            self._optimal_distribution,
245            self._other_distribution,
246        ]
247        check_distributions(
248            dists,
249            self._make_reward_stochastic,
250        )
251
252    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
253        grid = np.zeros((self._size, self._size), dtype=str)
254        grid[:, :] = " "
255        grid[self.goal_position[1], self.goal_position[0]] = "G"
256        if self.cur_node.Dir == MiniGridEmptyDirection.UP:
257            grid[self.cur_node.Y, self.cur_node.X] = "^"
258        elif self.cur_node.Dir == MiniGridEmptyDirection.RIGHT:
259            grid[self.cur_node.Y, self.cur_node.X] = ">"
260        elif self.cur_node.Dir == MiniGridEmptyDirection.DOWN:
261            grid[self.cur_node.Y, self.cur_node.X] = "v"
262        elif self.cur_node.Dir == MiniGridEmptyDirection.LEFT:
263            grid[self.cur_node.Y, self.cur_node.X] = "<"
264        return grid[::-1, :]
265
266    @property
267    def parameters(self) -> Dict[str, Any]:
268        return {
269            **super(MiniGridEmptyMDP, self).parameters,
270            **dict(
271                size=self._size,
272                n_starting_states=self._n_starting_states,
273                optimal_distribution=self._optimal_distribution,
274                other_distribution=self._other_distribution,
275            ),
276        }
277
278    def __init__(
279        self,
280        seed: int,
281        size: int,
282        n_starting_states: int = 1,
283        optimal_distribution: Union[Tuple, rv_continuous] = None,
284        other_distribution: Union[Tuple, rv_continuous] = None,
285        make_reward_stochastic=False,
286        reward_variance_multiplier: float = 1.0,
287        **kwargs,
288    ):
289        """
290        Parameters
291        ----------
292        seed : int
293            The seed used for sampling rewards and next states.
294        size : int
295            The size of the grid.
296        n_starting_states : int
297            The number of possible starting states.
298        optimal_distribution : Union[Tuple, rv_continuous]
299            The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters
300            or as a rv_continuous object.
301        other_distribution : Union[Tuple, rv_continuous]
302            The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a
303            rv_continuous object.
304        make_reward_stochastic : bool
305            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
306        reward_variance_multiplier : float
307            A constant that can be used to increase the variance of the reward distributions without changing their means.
308            The lower the value, the higher the variance. By default, it is set to 1.
309        """
310
311        if type(optimal_distribution) == tuple:
312            optimal_distribution = get_dist(
313                optimal_distribution[0], optimal_distribution[1]
314            )
315        if type(other_distribution) == tuple:
316            other_distribution = get_dist(other_distribution[0], other_distribution[1])
317
318        self._n_starting_states = n_starting_states
319        self._size = size
320
321        dists = [
322            optimal_distribution,
323            other_distribution,
324        ]
325        if dists.count(None) == 0:
326            self._optimal_distribution = optimal_distribution
327            self._other_distribution = other_distribution
328        else:
329            if make_reward_stochastic:
330                self._other_distribution = beta(
331                    reward_variance_multiplier,
332                    reward_variance_multiplier * (size ** 2 - 1),
333                )
334                self._optimal_distribution = beta(
335                    reward_variance_multiplier * (size ** 2 - 1),
336                    reward_variance_multiplier,
337                )
338            else:
339                self._optimal_distribution = deterministic(1.0)
340                self._other_distribution = deterministic(0.0)
341
342        super(MiniGridEmptyMDP, self).__init__(
343            seed=seed,
344            reward_variance_multiplier=reward_variance_multiplier,
345            make_reward_stochastic=make_reward_stochastic,
346            **kwargs,
347        )
class MiniGridEmptyAction(enum.IntEnum):
24class MiniGridEmptyAction(IntEnum):
25    """The action available in the MiniGridEmpty MDP."""
26
27    MoveForward = 0
28    """Move the agent forward."""
29    TurnRight = 1
30    """Turn the agent towards the right."""
31    TurnLeft = 2
32    """Turn the agent towards the left."""

The action available in the MiniGridEmpty MDP.

MoveForward = <MiniGridEmptyAction.MoveForward: 0>

Move the agent forward.

Turn the agent towards the right.

Turn the agent towards the left.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
class MiniGridEmptyDirection(enum.IntEnum):
35class MiniGridEmptyDirection(IntEnum):
36    """
37    The actions available in the MiniGridEmpty MDP.
38    """
39
40    UP = 0
41    RIGHT = 1
42    DOWN = 2
43    LEFT = 3

The actions available in the MiniGridEmpty MDP.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
@dataclass(frozen=True)
class MiniGridEmptyNode:
46@dataclass(frozen=True)
47class MiniGridEmptyNode:
48    """
49    The node for the MiniGridEmpty MDP.
50    """
51
52    X: int
53    """x coordinate."""
54    Y: int
55    """y coordinate."""
56    Dir: MiniGridEmptyDirection
57    """The direction the agent is facing."""
58
59    def __str__(self):
60        return f"X={self.X},Y={self.Y},Dir={self.Dir.name}"

The node for the MiniGridEmpty MDP.

MiniGridEmptyNode( X: int, Y: int, Dir: colosseum.mdp.minigrid_empty.base.MiniGridEmptyDirection)
X: int

x coordinate.

Y: int

y coordinate.

The direction the agent is facing.

class MiniGridEmptyMDP(colosseum.mdp.base.BaseMDP, abc.ABC):
 63class MiniGridEmptyMDP(BaseMDP, abc.ABC):
 64    """
 65    The base class for the MiniGridEmpty family.
 66    """
 67
 68    @staticmethod
 69    def get_unique_symbols() -> List[str]:
 70        return [" ", ">", "<", "v", "^", "G"]
 71
 72    @staticmethod
 73    def does_seed_change_MDP_structure() -> bool:
 74        return True
 75
 76    @staticmethod
 77    def sample_mdp_parameters(
 78        n: int, is_episodic: bool, seed: int = None
 79    ) -> List[Dict[str, Any]]:
 80        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 81        samples = []
 82        for _ in range(n):
 83            p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5])
 84            sample = dict(
 85                size=int(np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20))
 86                if is_episodic
 87                else int(1.5 * np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)),
 88                n_starting_states=rng.randint(1, 5),
 89                p_rand=p_rand,
 90                p_lazy=p_lazy,
 91                make_reward_stochastic=rng.choice([True, False]),
 92                reward_variance_multiplier=2 * rng.random() + 0.005,
 93            )
 94            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
 95            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
 96
 97            if sample["make_reward_stochastic"]:
 98                sample["optimal_distribution"] = (
 99                    "beta",
100                    (
101                        sample["reward_variance_multiplier"],
102                        sample["reward_variance_multiplier"]
103                        * (sample["size"] ** 2 - 1),
104                    ),
105                )
106                sample["other_distribution"] = (
107                    "beta",
108                    (
109                        sample["reward_variance_multiplier"]
110                        * (sample["size"] ** 2 - 1),
111                        sample["reward_variance_multiplier"],
112                    ),
113                )
114            else:
115                sample["optimal_distribution"] = ("deterministic", (1.0,))
116                sample["other_distribution"] = ("deterministic", (0.0,))
117
118            samples.append(rounding_nested_structure(sample))
119        return samples
120
121    @staticmethod
122    def get_node_class() -> Type["NODE_TYPE"]:
123        return MiniGridEmptyNode
124
125    def get_gin_parameters(self, index: int) -> str:
126        prms = dict(
127            size=self._size,
128            n_starting_states=self._n_starting_states,
129            make_reward_stochastic=self._make_reward_stochastic,
130            reward_variance_multiplier=self._reward_variance_multiplier,
131            optimal_distribution=(
132                self._optimal_distribution.dist.name,
133                self._optimal_distribution.args,
134            ),
135            other_distribution=(
136                self._other_distribution.dist.name,
137                self._other_distribution.args,
138            ),
139        )
140
141        if self._p_rand is not None:
142            prms["p_rand"] = self._p_rand
143        if self._p_lazy is not None:
144            prms["p_lazy"] = self._p_lazy
145
146        return MiniGridEmptyMDP.produce_gin_file_from_mdp_parameters(
147            prms, type(self).__name__, index
148        )
149
150    @property
151    def n_actions(self) -> int:
152        return len(MiniGridEmptyAction)
153
154    def _get_next_nodes_parameters(
155        self, node: "NODE_TYPE", action: "ACTION_TYPE"
156    ) -> Tuple[Tuple[dict, float], ...]:
157        d = node.Dir
158        if action == MiniGridEmptyAction.TurnRight:
159            return (
160                (
161                    dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d + 1) % 4)),
162                    1.0,
163                ),
164            )
165        if action == MiniGridEmptyAction.TurnLeft:
166            return (
167                (
168                    dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d - 1) % 4)),
169                    1.0,
170                ),
171            )
172        if action == MiniGridEmptyAction.MoveForward:
173            if d == MiniGridEmptyDirection.UP:
174                return (
175                    (dict(X=node.X, Y=min(node.Y + 1, self._size - 1), Dir=d), 1.0),
176                )
177            if d == MiniGridEmptyDirection.RIGHT:
178                return (
179                    (dict(X=min(self._size - 1, node.X + 1), Y=node.Y, Dir=d), 1.0),
180                )
181            if d == MiniGridEmptyDirection.DOWN:
182                return ((dict(X=node.X, Y=max(node.Y - 1, 0), Dir=d), 1.0),)
183            if d == MiniGridEmptyDirection.LEFT:
184                return ((dict(X=max(0, node.X - 1), Y=node.Y, Dir=d), 1.0),)
185
186    def _get_reward_distribution(
187        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
188    ) -> rv_continuous:
189        return (
190            self._optimal_distribution
191            if next_node.X == self.goal_position[0]
192            and next_node.Y == self.goal_position[1]
193            else self._other_distribution
194        )
195
196    def get_positions_on_side(self, side: int) -> List[Tuple[int, int]]:
197        nodes = []
198        for i in range(self._size):
199            for j in range(self._size):
200                if side == 0:  # Starting from the left
201                    nodes.append((i, j))
202                elif side == 1:  # Starting from the south
203                    nodes.append((j, i))
204                elif side == 2:  # Starting from the right
205                    nodes.append((self._size - 1 - i, self._size - 1 - j))
206                else:  # Starting from the north
207                    nodes.append((self._size - 1 - j, self._size - 1 - i))
208                # if len(nodes) == N:
209                #     return nodes
210        return nodes
211
212    @property
213    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
214        return [
215            MiniGridEmptyNode(x, y, MiniGridEmptyDirection(d))
216            for (x, y), d in product(self.__possible_starting_nodes, range(4))
217        ]
218
219    def _get_starting_node_sampler(self) -> NextStateSampler:
220        self.side_start = self._rng.randint(4)
221        self.goal_position = self.get_positions_on_side((self.side_start + 2) % 4)[
222            : self._size
223        ][self._rng.randint(self._size)]
224        self.__possible_starting_nodes = self.get_positions_on_side(self.side_start)[
225            : self._size
226        ]
227        self._rng.shuffle(self.__possible_starting_nodes)
228        starting_nodes = self.__possible_starting_nodes[: self._n_starting_states]
229        return NextStateSampler(
230            next_nodes=[
231                MiniGridEmptyNode(x, y, MiniGridEmptyDirection(self._rng.randint(4)))
232                for x, y in starting_nodes
233            ],
234            probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))],
235            seed=self._produce_random_seed(),
236        )
237
238    def _check_parameters_in_input(self):
239        super(MiniGridEmptyMDP, self)._check_parameters_in_input()
240
241        assert self._size > 2, f"the size should be greater than 2"
242        assert self._n_starting_states > 0
243
244        dists = [
245            self._optimal_distribution,
246            self._other_distribution,
247        ]
248        check_distributions(
249            dists,
250            self._make_reward_stochastic,
251        )
252
253    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
254        grid = np.zeros((self._size, self._size), dtype=str)
255        grid[:, :] = " "
256        grid[self.goal_position[1], self.goal_position[0]] = "G"
257        if self.cur_node.Dir == MiniGridEmptyDirection.UP:
258            grid[self.cur_node.Y, self.cur_node.X] = "^"
259        elif self.cur_node.Dir == MiniGridEmptyDirection.RIGHT:
260            grid[self.cur_node.Y, self.cur_node.X] = ">"
261        elif self.cur_node.Dir == MiniGridEmptyDirection.DOWN:
262            grid[self.cur_node.Y, self.cur_node.X] = "v"
263        elif self.cur_node.Dir == MiniGridEmptyDirection.LEFT:
264            grid[self.cur_node.Y, self.cur_node.X] = "<"
265        return grid[::-1, :]
266
267    @property
268    def parameters(self) -> Dict[str, Any]:
269        return {
270            **super(MiniGridEmptyMDP, self).parameters,
271            **dict(
272                size=self._size,
273                n_starting_states=self._n_starting_states,
274                optimal_distribution=self._optimal_distribution,
275                other_distribution=self._other_distribution,
276            ),
277        }
278
279    def __init__(
280        self,
281        seed: int,
282        size: int,
283        n_starting_states: int = 1,
284        optimal_distribution: Union[Tuple, rv_continuous] = None,
285        other_distribution: Union[Tuple, rv_continuous] = None,
286        make_reward_stochastic=False,
287        reward_variance_multiplier: float = 1.0,
288        **kwargs,
289    ):
290        """
291        Parameters
292        ----------
293        seed : int
294            The seed used for sampling rewards and next states.
295        size : int
296            The size of the grid.
297        n_starting_states : int
298            The number of possible starting states.
299        optimal_distribution : Union[Tuple, rv_continuous]
300            The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters
301            or as a rv_continuous object.
302        other_distribution : Union[Tuple, rv_continuous]
303            The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a
304            rv_continuous object.
305        make_reward_stochastic : bool
306            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
307        reward_variance_multiplier : float
308            A constant that can be used to increase the variance of the reward distributions without changing their means.
309            The lower the value, the higher the variance. By default, it is set to 1.
310        """
311
312        if type(optimal_distribution) == tuple:
313            optimal_distribution = get_dist(
314                optimal_distribution[0], optimal_distribution[1]
315            )
316        if type(other_distribution) == tuple:
317            other_distribution = get_dist(other_distribution[0], other_distribution[1])
318
319        self._n_starting_states = n_starting_states
320        self._size = size
321
322        dists = [
323            optimal_distribution,
324            other_distribution,
325        ]
326        if dists.count(None) == 0:
327            self._optimal_distribution = optimal_distribution
328            self._other_distribution = other_distribution
329        else:
330            if make_reward_stochastic:
331                self._other_distribution = beta(
332                    reward_variance_multiplier,
333                    reward_variance_multiplier * (size ** 2 - 1),
334                )
335                self._optimal_distribution = beta(
336                    reward_variance_multiplier * (size ** 2 - 1),
337                    reward_variance_multiplier,
338                )
339            else:
340                self._optimal_distribution = deterministic(1.0)
341                self._other_distribution = deterministic(0.0)
342
343        super(MiniGridEmptyMDP, self).__init__(
344            seed=seed,
345            reward_variance_multiplier=reward_variance_multiplier,
346            make_reward_stochastic=make_reward_stochastic,
347            **kwargs,
348        )

The base class for the MiniGridEmpty family.

MiniGridEmptyMDP( seed: int, size: int, n_starting_states: int = 1, optimal_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, other_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, make_reward_stochastic=False, reward_variance_multiplier: float = 1.0, **kwargs)
279    def __init__(
280        self,
281        seed: int,
282        size: int,
283        n_starting_states: int = 1,
284        optimal_distribution: Union[Tuple, rv_continuous] = None,
285        other_distribution: Union[Tuple, rv_continuous] = None,
286        make_reward_stochastic=False,
287        reward_variance_multiplier: float = 1.0,
288        **kwargs,
289    ):
290        """
291        Parameters
292        ----------
293        seed : int
294            The seed used for sampling rewards and next states.
295        size : int
296            The size of the grid.
297        n_starting_states : int
298            The number of possible starting states.
299        optimal_distribution : Union[Tuple, rv_continuous]
300            The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters
301            or as a rv_continuous object.
302        other_distribution : Union[Tuple, rv_continuous]
303            The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a
304            rv_continuous object.
305        make_reward_stochastic : bool
306            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
307        reward_variance_multiplier : float
308            A constant that can be used to increase the variance of the reward distributions without changing their means.
309            The lower the value, the higher the variance. By default, it is set to 1.
310        """
311
312        if type(optimal_distribution) == tuple:
313            optimal_distribution = get_dist(
314                optimal_distribution[0], optimal_distribution[1]
315            )
316        if type(other_distribution) == tuple:
317            other_distribution = get_dist(other_distribution[0], other_distribution[1])
318
319        self._n_starting_states = n_starting_states
320        self._size = size
321
322        dists = [
323            optimal_distribution,
324            other_distribution,
325        ]
326        if dists.count(None) == 0:
327            self._optimal_distribution = optimal_distribution
328            self._other_distribution = other_distribution
329        else:
330            if make_reward_stochastic:
331                self._other_distribution = beta(
332                    reward_variance_multiplier,
333                    reward_variance_multiplier * (size ** 2 - 1),
334                )
335                self._optimal_distribution = beta(
336                    reward_variance_multiplier * (size ** 2 - 1),
337                    reward_variance_multiplier,
338                )
339            else:
340                self._optimal_distribution = deterministic(1.0)
341                self._other_distribution = deterministic(0.0)
342
343        super(MiniGridEmptyMDP, self).__init__(
344            seed=seed,
345            reward_variance_multiplier=reward_variance_multiplier,
346            make_reward_stochastic=make_reward_stochastic,
347            **kwargs,
348        )
Parameters
  • seed (int): The seed used for sampling rewards and next states.
  • size (int): The size of the grid.
  • n_starting_states (int): The number of possible starting states.
  • optimal_distribution (Union[Tuple, rv_continuous]): The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
  • other_distribution (Union[Tuple, rv_continuous]): The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
  • make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
  • reward_variance_multiplier (float): A constant that can be used to increase the variance of the reward distributions without changing their means. The lower the value, the higher the variance. By default, it is set to 1.
@staticmethod
def get_unique_symbols() -> List[str]:
68    @staticmethod
69    def get_unique_symbols() -> List[str]:
70        return [" ", ">", "<", "v", "^", "G"]
Returns
  • List[str]: the unique symbols of the grid representation of the MDP.
@staticmethod
def does_seed_change_MDP_structure() -> bool:
72    @staticmethod
73    def does_seed_change_MDP_structure() -> bool:
74        return True
Returns
  • bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
  • happen when there are fewer starting states that possible one and the effective starting states are picked
  • randomly based on the seed.
@staticmethod
def sample_mdp_parameters(n: int, is_episodic: bool, seed: int = None) -> List[Dict[str, Any]]:
 76    @staticmethod
 77    def sample_mdp_parameters(
 78        n: int, is_episodic: bool, seed: int = None
 79    ) -> List[Dict[str, Any]]:
 80        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 81        samples = []
 82        for _ in range(n):
 83            p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5])
 84            sample = dict(
 85                size=int(np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20))
 86                if is_episodic
 87                else int(1.5 * np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)),
 88                n_starting_states=rng.randint(1, 5),
 89                p_rand=p_rand,
 90                p_lazy=p_lazy,
 91                make_reward_stochastic=rng.choice([True, False]),
 92                reward_variance_multiplier=2 * rng.random() + 0.005,
 93            )
 94            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
 95            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
 96
 97            if sample["make_reward_stochastic"]:
 98                sample["optimal_distribution"] = (
 99                    "beta",
100                    (
101                        sample["reward_variance_multiplier"],
102                        sample["reward_variance_multiplier"]
103                        * (sample["size"] ** 2 - 1),
104                    ),
105                )
106                sample["other_distribution"] = (
107                    "beta",
108                    (
109                        sample["reward_variance_multiplier"]
110                        * (sample["size"] ** 2 - 1),
111                        sample["reward_variance_multiplier"],
112                    ),
113                )
114            else:
115                sample["optimal_distribution"] = ("deterministic", (1.0,))
116                sample["other_distribution"] = ("deterministic", (0.0,))
117
118            samples.append(rounding_nested_structure(sample))
119        return samples
Returns
  • List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
def get_gin_parameters(self, index: int) -> str:
125    def get_gin_parameters(self, index: int) -> str:
126        prms = dict(
127            size=self._size,
128            n_starting_states=self._n_starting_states,
129            make_reward_stochastic=self._make_reward_stochastic,
130            reward_variance_multiplier=self._reward_variance_multiplier,
131            optimal_distribution=(
132                self._optimal_distribution.dist.name,
133                self._optimal_distribution.args,
134            ),
135            other_distribution=(
136                self._other_distribution.dist.name,
137                self._other_distribution.args,
138            ),
139        )
140
141        if self._p_rand is not None:
142            prms["p_rand"] = self._p_rand
143        if self._p_lazy is not None:
144            prms["p_lazy"] = self._p_lazy
145
146        return MiniGridEmptyMDP.produce_gin_file_from_mdp_parameters(
147            prms, type(self).__name__, index
148        )
Returns
  • str: The gin config of the MDP instance.
n_actions: int
Returns
  • int: The number of available actions.
def get_positions_on_side(self, side: int) -> List[Tuple[int, int]]:
196    def get_positions_on_side(self, side: int) -> List[Tuple[int, int]]:
197        nodes = []
198        for i in range(self._size):
199            for j in range(self._size):
200                if side == 0:  # Starting from the left
201                    nodes.append((i, j))
202                elif side == 1:  # Starting from the south
203                    nodes.append((j, i))
204                elif side == 2:  # Starting from the right
205                    nodes.append((self._size - 1 - i, self._size - 1 - j))
206                else:  # Starting from the north
207                    nodes.append((self._size - 1 - j, self._size - 1 - i))
208                # if len(nodes) == N:
209                #     return nodes
210        return nodes
parameters: Dict[str, Any]
Returns
  • Dict[str, Any]: The parameters of the MDP.