colosseum.mdp.minigrid_rooms.base

  1import abc
  2from dataclasses import asdict, dataclass
  3from enum import IntEnum
  4from itertools import product
  5from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Type, Union
  6
  7import numpy as np
  8from scipy.stats import beta, rv_continuous
  9
 10from colosseum.mdp import BaseMDP
 11from colosseum.mdp.utils.custom_samplers import NextStateSampler
 12from colosseum.utils.miscellanea import (
 13    check_distributions,
 14    deterministic,
 15    get_dist,
 16    rounding_nested_structure,
 17)
 18
 19if TYPE_CHECKING:
 20    from colosseum.mdp import ACTION_TYPE, NODE_TYPE
 21
 22
 23class MiniGridRoomsAction(IntEnum):
 24    """
 25    The actions available in the MiniGridRooms MDP.
 26    """
 27
 28    MoveForward = 0
 29    TurnRight = 1
 30    TurnLeft = 2
 31
 32
 33class MiniGridRoomsDirection(IntEnum):
 34    """The possible agent_directions in the MiniGridRooms MDP."""
 35
 36    UP = 0
 37    RIGHT = 1
 38    DOWN = 2
 39    LEFT = 3
 40
 41    def grid_movement(self) -> np.array:
 42        """:returns the effect caused by each action in grid space."""
 43        if self == MiniGridRoomsDirection.UP:
 44            return np.array((0, 1))
 45        elif self == MiniGridRoomsDirection.DOWN:
 46            return np.array((0, -1))
 47        elif self == MiniGridRoomsDirection.RIGHT:
 48            return np.array((1, 0))
 49        else:
 50            return np.array((-1, 0))
 51
 52
 53@dataclass(frozen=True)
 54class MiniGridRoomsNode:
 55    """
 56    The node for the MiniGrid-Rooms MDP.
 57    """
 58
 59    X: int
 60    """x coordinate."""
 61    Y: int
 62    """y coordinate."""
 63    Dir: MiniGridRoomsDirection
 64    """The direction the agent is facing."""
 65
 66    def __str__(self):
 67        return f"X={self.X},Y={self.Y},Dir={MiniGridRoomsDirection(self.Dir).name}"
 68
 69
 70class MiniGridRoomsMDP(BaseMDP, abc.ABC):
 71    """
 72    The base class for the MiniGrid-Rooms family.
 73    """
 74
 75    @staticmethod
 76    def get_unique_symbols() -> List[str]:
 77        return [" ", ">", "<", "v", "^", "G", "W"]
 78
 79    @staticmethod
 80    def does_seed_change_MDP_structure() -> bool:
 81        return True
 82
 83    @staticmethod
 84    def sample_mdp_parameters(
 85        n: int, is_episodic: bool, seed: int = None
 86    ) -> List[Dict[str, Any]]:
 87        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 88        samples = []
 89        for _ in range(n):
 90            p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5])
 91            n_rooms, room_size, _ = rng.dirichlet([0.2, 0.2, 1])
 92            n_rooms = min(9, (2 * n_rooms + 2).astype(int) ** 2)
 93            room_size = min(9, (7.0 * room_size + 3).astype(int))
 94            if is_episodic:
 95                room_size = max(room_size - 3, 3)
 96            sample = dict(
 97                room_size=room_size,
 98                n_rooms=n_rooms,
 99                n_starting_states=rng.randint(1, 5),
100                p_rand=p_rand,
101                p_lazy=p_lazy,
102                make_reward_stochastic=rng.choice([True, False]),
103                reward_variance_multiplier=2 * rng.random() + 0.005,
104            )
105            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
106            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
107
108            if sample["make_reward_stochastic"]:
109                size = int(sample["room_size"] * sample["n_rooms"] ** 0.5)
110                sample["optimal_distribution"] = (
111                    "beta",
112                    (
113                        sample["reward_variance_multiplier"],
114                        sample["reward_variance_multiplier"] * (size ** 2 - 1),
115                    ),
116                )
117                sample["other_distribution"] = (
118                    "beta",
119                    (
120                        sample["reward_variance_multiplier"] * (size ** 2 - 1),
121                        sample["reward_variance_multiplier"],
122                    ),
123                )
124            else:
125                sample["optimal_distribution"] = ("deterministic", (1.0,))
126                sample["other_distribution"] = ("deterministic", (0.0,))
127
128            samples.append(rounding_nested_structure(sample))
129        return samples
130
131    @staticmethod
132    def get_node_class() -> Type["NODE_TYPE"]:
133        return MiniGridRoomsNode
134
135    def get_gin_parameters(self, index: int) -> str:
136        prms = dict(
137            room_size=self._room_size,
138            n_rooms=self._n_rooms,
139            n_starting_states=self._n_starting_states,
140            make_reward_stochastic=self._make_reward_stochastic,
141            reward_variance_multiplier=self._reward_variance_multiplier,
142            optimal_distribution=(
143                self._optimal_distribution.dist.name,
144                self._optimal_distribution.args,
145            ),
146            other_distribution=(
147                self._other_distribution.dist.name,
148                self._other_distribution.args,
149            ),
150        )
151
152        if self._p_rand is not None:
153            prms["p_rand"] = self._p_rand
154        if self._p_lazy is not None:
155            prms["p_lazy"] = self._p_lazy
156
157        return MiniGridRoomsMDP.produce_gin_file_from_mdp_parameters(
158            prms, type(self).__name__, index
159        )
160
161    @property
162    def n_actions(self) -> int:
163        return len(MiniGridRoomsAction)
164
165    @property
166    def _admissible_coordinate(self) -> list:
167        rooms_per_row = int(np.sqrt(self._n_rooms))
168
169        vertical_checkers = [
170            j * (self._room_size) + j + int(np.floor(self._room_size / 2))
171            for j in range(rooms_per_row)
172        ]
173        horizontal_checkers = [
174            j * self._room_size + j - 1 for j in range(1, rooms_per_row)
175        ]
176        door_positions = list(product(horizontal_checkers, vertical_checkers)) + list(
177            product(vertical_checkers, horizontal_checkers)
178        )
179        rooms_coordinates = []
180        for room_coord in product(range(rooms_per_row), range(rooms_per_row)):
181            room = self.get_positions_coords_in_room(self._room_size, room_coord)
182            for c in room.ravel().tolist():
183                rooms_coordinates.append(tuple(c))
184        return rooms_coordinates + door_positions
185
186    @staticmethod
187    def get_positions_coords_in_room(
188        room_size: int, room_coord: Tuple[int, int]
189    ) -> np.array:
190        x_room_coord, y_room_coord = room_coord
191        nodes = np.zeros((room_size, room_size), dtype=object)
192        for i in range(room_size):
193            for j in range(room_size):
194                nodes[j, i] = (
195                    i + (room_size + 1) * x_room_coord,
196                    j + (room_size + 1) * y_room_coord,
197                )
198        nodes = nodes[::-1]
199        return nodes
200
201    def _get_next_nodes_parameters(
202        self, node: "NODE_TYPE", action: "ACTION_TYPE"
203    ) -> Tuple[Tuple[dict, float], ...]:
204        d = node.Dir
205        if action == MiniGridRoomsAction.TurnRight:
206            return ((dict(X=node.X, Y=node.Y, Dir=((d + 1) % 4)), 1.0),)
207        if action == MiniGridRoomsAction.TurnLeft:
208            return ((dict(X=node.X, Y=node.Y, Dir=((d - 1) % 4)), 1.0),)
209        if action == MiniGridRoomsAction.MoveForward:
210            if d == MiniGridRoomsDirection.UP:
211                next_coord = (node.X, node.Y + 1)
212            if d == MiniGridRoomsDirection.RIGHT:
213                next_coord = node.X + 1, node.Y
214            if d == MiniGridRoomsDirection.DOWN:
215                next_coord = node.X, node.Y - 1
216            if d == MiniGridRoomsDirection.LEFT:
217                next_coord = node.X - 1, node.Y
218            if next_coord in self._admissible_coordinate:
219                return ((dict(X=next_coord[0], Y=next_coord[1], Dir=d), 1.0),)
220            return ((asdict(node), 1.0),)
221
222    def _get_reward_distribution(
223        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
224    ) -> rv_continuous:
225        return (
226            self._optimal_distribution
227            if next_node.X == self.goal_position[0]
228            and next_node.Y == self.goal_position[1]
229            else self._other_distribution
230        )
231
232    def _get_starting_node_sampler(self) -> NextStateSampler:
233        rooms_per_row = int(np.sqrt(self._n_rooms))
234        rooms = list(product(range(rooms_per_row), range(rooms_per_row)))
235
236        corner_rooms = list(product((0, int(self._n_rooms ** 0.5) - 1), repeat=2))
237        sr = self._fast_rng.randint(0, len(corner_rooms) - 1)
238        self.starting_room = corner_rooms[sr]
239        corner_rooms.pop(sr)
240        self.goal_room = corner_rooms[self._fast_rng.randint(0, len(corner_rooms) - 1)]
241        assert self.goal_room != self.starting_room
242
243        # Random goal position from a random room
244        goal_positions = (
245            self.get_positions_coords_in_room(self._room_size, self.goal_room)
246            .ravel()
247            .tolist()
248        )
249        self._rng.shuffle(goal_positions)
250        self.goal_position = goal_positions[0]
251
252        # Random starting position from a random room
253        starting_nodes = [
254            MiniGridRoomsNode(x, y, MiniGridRoomsDirection(d))
255            for x, y in self.get_positions_coords_in_room(
256                self._room_size, self.starting_room
257            )
258            .ravel()
259            .tolist()
260            for d in range(4)
261        ]
262        self._rng.shuffle(starting_nodes)
263        self.__possible_starting_nodes = starting_nodes
264
265        return NextStateSampler(
266            next_nodes=self._possible_starting_nodes[: self._n_starting_states],
267            probs=[1 / self._n_starting_states for _ in range(self._n_starting_states)],
268            seed=self._produce_random_seed(),
269        )
270
271    def _check_parameters_in_input(self):
272        super(MiniGridRoomsMDP, self)._check_parameters_in_input()
273
274        assert self._n_rooms >= 4, "There should be at least 4 rooms"
275        assert self._room_size >= 2, "The room size must be at least 2"
276        assert int(np.sqrt(self._n_rooms)) == np.sqrt(
277            self._n_rooms
278        ), "Please provide a number of rooms with perfect square."
279
280        assert self._n_starting_states > 0
281
282        dists = [
283            self._optimal_distribution,
284            self._other_distribution,
285        ]
286        check_distributions(
287            dists,
288            self._make_reward_stochastic,
289        )
290
291    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
292        rooms_per_row = int(np.sqrt(self._n_rooms))
293        door_positions = [
294            int(self._room_size // 2) + i * (self._room_size + 1) + 1
295            for i in range(rooms_per_row)
296        ]
297        grid_size = rooms_per_row * self._room_size + rooms_per_row - 1
298        grid = np.zeros((grid_size, grid_size), dtype=str)
299
300        for x in range(1, grid_size + 1):
301            for y in range(1, grid_size + 1):
302                if (
303                    x != 0
304                    and x != (grid_size)
305                    and x % (self._room_size + 1) == 0
306                    and not y in door_positions
307                ):
308                    grid[y - 1, x - 1] = "W"
309                    continue
310                elif (
311                    y != 0
312                    and y != (grid_size)
313                    and y % (self._room_size + 1) == 0
314                    and not x in door_positions
315                ):
316                    grid[y - 1, x - 1] = "W"
317                    continue
318                else:
319                    grid[y - 1, x - 1] = " "
320
321        grid[self.goal_position[1], self.goal_position[0]] = "G"
322
323        if self.cur_node.Dir == MiniGridRoomsDirection.UP:
324            grid[self.cur_node.Y, self.cur_node.X] = "^"
325        elif self.cur_node.Dir == MiniGridRoomsDirection.RIGHT:
326            grid[self.cur_node.Y, self.cur_node.X] = ">"
327        elif self.cur_node.Dir == MiniGridRoomsDirection.DOWN:
328            grid[self.cur_node.Y, self.cur_node.X] = "v"
329        elif self.cur_node.Dir == MiniGridRoomsDirection.LEFT:
330            grid[self.cur_node.Y, self.cur_node.X] = "<"
331
332        return grid[::-1, :]
333
334    @property
335    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
336        return self.__possible_starting_nodes
337
338    @property
339    def parameters(self) -> Dict[str, Any]:
340        return {
341            **super(MiniGridRoomsMDP, self).parameters,
342            **dict(
343                room_size=self._room_size,
344                n_rooms=self._n_rooms,
345                n_starting_states=self._n_starting_states,
346                optimal_distribution=self._optimal_distribution,
347                other_distribution=self._other_distribution,
348            ),
349        }
350
351    def __init__(
352        self,
353        seed: int,
354        room_size: int,
355        n_rooms: int = 4,
356        n_starting_states: int = 2,
357        optimal_distribution: Union[Tuple, rv_continuous] = None,
358        other_distribution: Union[Tuple, rv_continuous] = None,
359        make_reward_stochastic=False,
360        reward_variance_multiplier: float = 1.0,
361        **kwargs,
362    ):
363        """
364
365        Parameters
366        ----------
367        seed : int
368            The seed used for sampling rewards and next states.
369        room_size : int
370            The size of the roorms.
371        n_rooms : int
372            The number of rooms. This must be a squared number.
373        n_starting_states : int
374            The number of possible starting states.
375        optimal_distribution : Union[Tuple, rv_continuous]
376            The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters
377            or as a rv_continuous object.
378        other_distribution : Union[Tuple, rv_continuous]
379            The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a
380            rv_continuous object.
381        make_reward_stochastic : bool
382            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
383        reward_variance_multiplier : float
384            A constant that can be used to increase the variance of the reward distributions without changing their means.
385            The lower the value, the higher the variance. By default, it is set to 1.
386        """
387
388        if type(optimal_distribution) == tuple:
389            optimal_distribution = get_dist(
390                optimal_distribution[0], optimal_distribution[1]
391            )
392        if type(other_distribution) == tuple:
393            other_distribution = get_dist(other_distribution[0], other_distribution[1])
394
395        self._n_starting_states = n_starting_states
396        self._room_size = room_size
397        self._n_rooms = n_rooms
398
399        dists = [
400            optimal_distribution,
401            other_distribution,
402        ]
403        if dists.count(None) == 0:
404            self._optimal_distribution = optimal_distribution
405            self._other_distribution = other_distribution
406        else:
407            if make_reward_stochastic:
408                size = int(room_size * n_rooms ** 0.5)
409                self._other_distribution = beta(
410                    reward_variance_multiplier,
411                    reward_variance_multiplier * (size ** 2 - 1),
412                )
413                self._optimal_distribution = beta(
414                    reward_variance_multiplier * (size ** 2 - 1),
415                    reward_variance_multiplier,
416                )
417            else:
418                self._optimal_distribution = deterministic(1.0)
419                self._other_distribution = deterministic(0.0)
420
421        super(MiniGridRoomsMDP, self).__init__(
422            seed=seed,
423            reward_variance_multiplier=reward_variance_multiplier,
424            make_reward_stochastic=make_reward_stochastic,
425            **kwargs,
426        )
class MiniGridRoomsAction(enum.IntEnum):
24class MiniGridRoomsAction(IntEnum):
25    """
26    The actions available in the MiniGridRooms MDP.
27    """
28
29    MoveForward = 0
30    TurnRight = 1
31    TurnLeft = 2

The actions available in the MiniGridRooms MDP.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
class MiniGridRoomsDirection(enum.IntEnum):
34class MiniGridRoomsDirection(IntEnum):
35    """The possible agent_directions in the MiniGridRooms MDP."""
36
37    UP = 0
38    RIGHT = 1
39    DOWN = 2
40    LEFT = 3
41
42    def grid_movement(self) -> np.array:
43        """:returns the effect caused by each action in grid space."""
44        if self == MiniGridRoomsDirection.UP:
45            return np.array((0, 1))
46        elif self == MiniGridRoomsDirection.DOWN:
47            return np.array((0, -1))
48        elif self == MiniGridRoomsDirection.RIGHT:
49            return np.array((1, 0))
50        else:
51            return np.array((-1, 0))

The possible agent_directions in the MiniGridRooms MDP.

def grid_movement(self) -> <built-in function array>:
42    def grid_movement(self) -> np.array:
43        """:returns the effect caused by each action in grid space."""
44        if self == MiniGridRoomsDirection.UP:
45            return np.array((0, 1))
46        elif self == MiniGridRoomsDirection.DOWN:
47            return np.array((0, -1))
48        elif self == MiniGridRoomsDirection.RIGHT:
49            return np.array((1, 0))
50        else:
51            return np.array((-1, 0))

:returns the effect caused by each action in grid space.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
@dataclass(frozen=True)
class MiniGridRoomsNode:
54@dataclass(frozen=True)
55class MiniGridRoomsNode:
56    """
57    The node for the MiniGrid-Rooms MDP.
58    """
59
60    X: int
61    """x coordinate."""
62    Y: int
63    """y coordinate."""
64    Dir: MiniGridRoomsDirection
65    """The direction the agent is facing."""
66
67    def __str__(self):
68        return f"X={self.X},Y={self.Y},Dir={MiniGridRoomsDirection(self.Dir).name}"

The node for the MiniGrid-Rooms MDP.

MiniGridRoomsNode( X: int, Y: int, Dir: colosseum.mdp.minigrid_rooms.base.MiniGridRoomsDirection)
X: int

x coordinate.

Y: int

y coordinate.

The direction the agent is facing.

class MiniGridRoomsMDP(colosseum.mdp.base.BaseMDP, abc.ABC):
 71class MiniGridRoomsMDP(BaseMDP, abc.ABC):
 72    """
 73    The base class for the MiniGrid-Rooms family.
 74    """
 75
 76    @staticmethod
 77    def get_unique_symbols() -> List[str]:
 78        return [" ", ">", "<", "v", "^", "G", "W"]
 79
 80    @staticmethod
 81    def does_seed_change_MDP_structure() -> bool:
 82        return True
 83
 84    @staticmethod
 85    def sample_mdp_parameters(
 86        n: int, is_episodic: bool, seed: int = None
 87    ) -> List[Dict[str, Any]]:
 88        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 89        samples = []
 90        for _ in range(n):
 91            p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5])
 92            n_rooms, room_size, _ = rng.dirichlet([0.2, 0.2, 1])
 93            n_rooms = min(9, (2 * n_rooms + 2).astype(int) ** 2)
 94            room_size = min(9, (7.0 * room_size + 3).astype(int))
 95            if is_episodic:
 96                room_size = max(room_size - 3, 3)
 97            sample = dict(
 98                room_size=room_size,
 99                n_rooms=n_rooms,
100                n_starting_states=rng.randint(1, 5),
101                p_rand=p_rand,
102                p_lazy=p_lazy,
103                make_reward_stochastic=rng.choice([True, False]),
104                reward_variance_multiplier=2 * rng.random() + 0.005,
105            )
106            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
107            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
108
109            if sample["make_reward_stochastic"]:
110                size = int(sample["room_size"] * sample["n_rooms"] ** 0.5)
111                sample["optimal_distribution"] = (
112                    "beta",
113                    (
114                        sample["reward_variance_multiplier"],
115                        sample["reward_variance_multiplier"] * (size ** 2 - 1),
116                    ),
117                )
118                sample["other_distribution"] = (
119                    "beta",
120                    (
121                        sample["reward_variance_multiplier"] * (size ** 2 - 1),
122                        sample["reward_variance_multiplier"],
123                    ),
124                )
125            else:
126                sample["optimal_distribution"] = ("deterministic", (1.0,))
127                sample["other_distribution"] = ("deterministic", (0.0,))
128
129            samples.append(rounding_nested_structure(sample))
130        return samples
131
132    @staticmethod
133    def get_node_class() -> Type["NODE_TYPE"]:
134        return MiniGridRoomsNode
135
136    def get_gin_parameters(self, index: int) -> str:
137        prms = dict(
138            room_size=self._room_size,
139            n_rooms=self._n_rooms,
140            n_starting_states=self._n_starting_states,
141            make_reward_stochastic=self._make_reward_stochastic,
142            reward_variance_multiplier=self._reward_variance_multiplier,
143            optimal_distribution=(
144                self._optimal_distribution.dist.name,
145                self._optimal_distribution.args,
146            ),
147            other_distribution=(
148                self._other_distribution.dist.name,
149                self._other_distribution.args,
150            ),
151        )
152
153        if self._p_rand is not None:
154            prms["p_rand"] = self._p_rand
155        if self._p_lazy is not None:
156            prms["p_lazy"] = self._p_lazy
157
158        return MiniGridRoomsMDP.produce_gin_file_from_mdp_parameters(
159            prms, type(self).__name__, index
160        )
161
162    @property
163    def n_actions(self) -> int:
164        return len(MiniGridRoomsAction)
165
166    @property
167    def _admissible_coordinate(self) -> list:
168        rooms_per_row = int(np.sqrt(self._n_rooms))
169
170        vertical_checkers = [
171            j * (self._room_size) + j + int(np.floor(self._room_size / 2))
172            for j in range(rooms_per_row)
173        ]
174        horizontal_checkers = [
175            j * self._room_size + j - 1 for j in range(1, rooms_per_row)
176        ]
177        door_positions = list(product(horizontal_checkers, vertical_checkers)) + list(
178            product(vertical_checkers, horizontal_checkers)
179        )
180        rooms_coordinates = []
181        for room_coord in product(range(rooms_per_row), range(rooms_per_row)):
182            room = self.get_positions_coords_in_room(self._room_size, room_coord)
183            for c in room.ravel().tolist():
184                rooms_coordinates.append(tuple(c))
185        return rooms_coordinates + door_positions
186
187    @staticmethod
188    def get_positions_coords_in_room(
189        room_size: int, room_coord: Tuple[int, int]
190    ) -> np.array:
191        x_room_coord, y_room_coord = room_coord
192        nodes = np.zeros((room_size, room_size), dtype=object)
193        for i in range(room_size):
194            for j in range(room_size):
195                nodes[j, i] = (
196                    i + (room_size + 1) * x_room_coord,
197                    j + (room_size + 1) * y_room_coord,
198                )
199        nodes = nodes[::-1]
200        return nodes
201
202    def _get_next_nodes_parameters(
203        self, node: "NODE_TYPE", action: "ACTION_TYPE"
204    ) -> Tuple[Tuple[dict, float], ...]:
205        d = node.Dir
206        if action == MiniGridRoomsAction.TurnRight:
207            return ((dict(X=node.X, Y=node.Y, Dir=((d + 1) % 4)), 1.0),)
208        if action == MiniGridRoomsAction.TurnLeft:
209            return ((dict(X=node.X, Y=node.Y, Dir=((d - 1) % 4)), 1.0),)
210        if action == MiniGridRoomsAction.MoveForward:
211            if d == MiniGridRoomsDirection.UP:
212                next_coord = (node.X, node.Y + 1)
213            if d == MiniGridRoomsDirection.RIGHT:
214                next_coord = node.X + 1, node.Y
215            if d == MiniGridRoomsDirection.DOWN:
216                next_coord = node.X, node.Y - 1
217            if d == MiniGridRoomsDirection.LEFT:
218                next_coord = node.X - 1, node.Y
219            if next_coord in self._admissible_coordinate:
220                return ((dict(X=next_coord[0], Y=next_coord[1], Dir=d), 1.0),)
221            return ((asdict(node), 1.0),)
222
223    def _get_reward_distribution(
224        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
225    ) -> rv_continuous:
226        return (
227            self._optimal_distribution
228            if next_node.X == self.goal_position[0]
229            and next_node.Y == self.goal_position[1]
230            else self._other_distribution
231        )
232
233    def _get_starting_node_sampler(self) -> NextStateSampler:
234        rooms_per_row = int(np.sqrt(self._n_rooms))
235        rooms = list(product(range(rooms_per_row), range(rooms_per_row)))
236
237        corner_rooms = list(product((0, int(self._n_rooms ** 0.5) - 1), repeat=2))
238        sr = self._fast_rng.randint(0, len(corner_rooms) - 1)
239        self.starting_room = corner_rooms[sr]
240        corner_rooms.pop(sr)
241        self.goal_room = corner_rooms[self._fast_rng.randint(0, len(corner_rooms) - 1)]
242        assert self.goal_room != self.starting_room
243
244        # Random goal position from a random room
245        goal_positions = (
246            self.get_positions_coords_in_room(self._room_size, self.goal_room)
247            .ravel()
248            .tolist()
249        )
250        self._rng.shuffle(goal_positions)
251        self.goal_position = goal_positions[0]
252
253        # Random starting position from a random room
254        starting_nodes = [
255            MiniGridRoomsNode(x, y, MiniGridRoomsDirection(d))
256            for x, y in self.get_positions_coords_in_room(
257                self._room_size, self.starting_room
258            )
259            .ravel()
260            .tolist()
261            for d in range(4)
262        ]
263        self._rng.shuffle(starting_nodes)
264        self.__possible_starting_nodes = starting_nodes
265
266        return NextStateSampler(
267            next_nodes=self._possible_starting_nodes[: self._n_starting_states],
268            probs=[1 / self._n_starting_states for _ in range(self._n_starting_states)],
269            seed=self._produce_random_seed(),
270        )
271
272    def _check_parameters_in_input(self):
273        super(MiniGridRoomsMDP, self)._check_parameters_in_input()
274
275        assert self._n_rooms >= 4, "There should be at least 4 rooms"
276        assert self._room_size >= 2, "The room size must be at least 2"
277        assert int(np.sqrt(self._n_rooms)) == np.sqrt(
278            self._n_rooms
279        ), "Please provide a number of rooms with perfect square."
280
281        assert self._n_starting_states > 0
282
283        dists = [
284            self._optimal_distribution,
285            self._other_distribution,
286        ]
287        check_distributions(
288            dists,
289            self._make_reward_stochastic,
290        )
291
292    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
293        rooms_per_row = int(np.sqrt(self._n_rooms))
294        door_positions = [
295            int(self._room_size // 2) + i * (self._room_size + 1) + 1
296            for i in range(rooms_per_row)
297        ]
298        grid_size = rooms_per_row * self._room_size + rooms_per_row - 1
299        grid = np.zeros((grid_size, grid_size), dtype=str)
300
301        for x in range(1, grid_size + 1):
302            for y in range(1, grid_size + 1):
303                if (
304                    x != 0
305                    and x != (grid_size)
306                    and x % (self._room_size + 1) == 0
307                    and not y in door_positions
308                ):
309                    grid[y - 1, x - 1] = "W"
310                    continue
311                elif (
312                    y != 0
313                    and y != (grid_size)
314                    and y % (self._room_size + 1) == 0
315                    and not x in door_positions
316                ):
317                    grid[y - 1, x - 1] = "W"
318                    continue
319                else:
320                    grid[y - 1, x - 1] = " "
321
322        grid[self.goal_position[1], self.goal_position[0]] = "G"
323
324        if self.cur_node.Dir == MiniGridRoomsDirection.UP:
325            grid[self.cur_node.Y, self.cur_node.X] = "^"
326        elif self.cur_node.Dir == MiniGridRoomsDirection.RIGHT:
327            grid[self.cur_node.Y, self.cur_node.X] = ">"
328        elif self.cur_node.Dir == MiniGridRoomsDirection.DOWN:
329            grid[self.cur_node.Y, self.cur_node.X] = "v"
330        elif self.cur_node.Dir == MiniGridRoomsDirection.LEFT:
331            grid[self.cur_node.Y, self.cur_node.X] = "<"
332
333        return grid[::-1, :]
334
335    @property
336    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
337        return self.__possible_starting_nodes
338
339    @property
340    def parameters(self) -> Dict[str, Any]:
341        return {
342            **super(MiniGridRoomsMDP, self).parameters,
343            **dict(
344                room_size=self._room_size,
345                n_rooms=self._n_rooms,
346                n_starting_states=self._n_starting_states,
347                optimal_distribution=self._optimal_distribution,
348                other_distribution=self._other_distribution,
349            ),
350        }
351
352    def __init__(
353        self,
354        seed: int,
355        room_size: int,
356        n_rooms: int = 4,
357        n_starting_states: int = 2,
358        optimal_distribution: Union[Tuple, rv_continuous] = None,
359        other_distribution: Union[Tuple, rv_continuous] = None,
360        make_reward_stochastic=False,
361        reward_variance_multiplier: float = 1.0,
362        **kwargs,
363    ):
364        """
365
366        Parameters
367        ----------
368        seed : int
369            The seed used for sampling rewards and next states.
370        room_size : int
371            The size of the roorms.
372        n_rooms : int
373            The number of rooms. This must be a squared number.
374        n_starting_states : int
375            The number of possible starting states.
376        optimal_distribution : Union[Tuple, rv_continuous]
377            The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters
378            or as a rv_continuous object.
379        other_distribution : Union[Tuple, rv_continuous]
380            The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a
381            rv_continuous object.
382        make_reward_stochastic : bool
383            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
384        reward_variance_multiplier : float
385            A constant that can be used to increase the variance of the reward distributions without changing their means.
386            The lower the value, the higher the variance. By default, it is set to 1.
387        """
388
389        if type(optimal_distribution) == tuple:
390            optimal_distribution = get_dist(
391                optimal_distribution[0], optimal_distribution[1]
392            )
393        if type(other_distribution) == tuple:
394            other_distribution = get_dist(other_distribution[0], other_distribution[1])
395
396        self._n_starting_states = n_starting_states
397        self._room_size = room_size
398        self._n_rooms = n_rooms
399
400        dists = [
401            optimal_distribution,
402            other_distribution,
403        ]
404        if dists.count(None) == 0:
405            self._optimal_distribution = optimal_distribution
406            self._other_distribution = other_distribution
407        else:
408            if make_reward_stochastic:
409                size = int(room_size * n_rooms ** 0.5)
410                self._other_distribution = beta(
411                    reward_variance_multiplier,
412                    reward_variance_multiplier * (size ** 2 - 1),
413                )
414                self._optimal_distribution = beta(
415                    reward_variance_multiplier * (size ** 2 - 1),
416                    reward_variance_multiplier,
417                )
418            else:
419                self._optimal_distribution = deterministic(1.0)
420                self._other_distribution = deterministic(0.0)
421
422        super(MiniGridRoomsMDP, self).__init__(
423            seed=seed,
424            reward_variance_multiplier=reward_variance_multiplier,
425            make_reward_stochastic=make_reward_stochastic,
426            **kwargs,
427        )

The base class for the MiniGrid-Rooms family.

MiniGridRoomsMDP( seed: int, room_size: int, n_rooms: int = 4, n_starting_states: int = 2, optimal_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, other_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, make_reward_stochastic=False, reward_variance_multiplier: float = 1.0, **kwargs)
352    def __init__(
353        self,
354        seed: int,
355        room_size: int,
356        n_rooms: int = 4,
357        n_starting_states: int = 2,
358        optimal_distribution: Union[Tuple, rv_continuous] = None,
359        other_distribution: Union[Tuple, rv_continuous] = None,
360        make_reward_stochastic=False,
361        reward_variance_multiplier: float = 1.0,
362        **kwargs,
363    ):
364        """
365
366        Parameters
367        ----------
368        seed : int
369            The seed used for sampling rewards and next states.
370        room_size : int
371            The size of the roorms.
372        n_rooms : int
373            The number of rooms. This must be a squared number.
374        n_starting_states : int
375            The number of possible starting states.
376        optimal_distribution : Union[Tuple, rv_continuous]
377            The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters
378            or as a rv_continuous object.
379        other_distribution : Union[Tuple, rv_continuous]
380            The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a
381            rv_continuous object.
382        make_reward_stochastic : bool
383            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
384        reward_variance_multiplier : float
385            A constant that can be used to increase the variance of the reward distributions without changing their means.
386            The lower the value, the higher the variance. By default, it is set to 1.
387        """
388
389        if type(optimal_distribution) == tuple:
390            optimal_distribution = get_dist(
391                optimal_distribution[0], optimal_distribution[1]
392            )
393        if type(other_distribution) == tuple:
394            other_distribution = get_dist(other_distribution[0], other_distribution[1])
395
396        self._n_starting_states = n_starting_states
397        self._room_size = room_size
398        self._n_rooms = n_rooms
399
400        dists = [
401            optimal_distribution,
402            other_distribution,
403        ]
404        if dists.count(None) == 0:
405            self._optimal_distribution = optimal_distribution
406            self._other_distribution = other_distribution
407        else:
408            if make_reward_stochastic:
409                size = int(room_size * n_rooms ** 0.5)
410                self._other_distribution = beta(
411                    reward_variance_multiplier,
412                    reward_variance_multiplier * (size ** 2 - 1),
413                )
414                self._optimal_distribution = beta(
415                    reward_variance_multiplier * (size ** 2 - 1),
416                    reward_variance_multiplier,
417                )
418            else:
419                self._optimal_distribution = deterministic(1.0)
420                self._other_distribution = deterministic(0.0)
421
422        super(MiniGridRoomsMDP, self).__init__(
423            seed=seed,
424            reward_variance_multiplier=reward_variance_multiplier,
425            make_reward_stochastic=make_reward_stochastic,
426            **kwargs,
427        )
Parameters
  • seed (int): The seed used for sampling rewards and next states.
  • room_size (int): The size of the roorms.
  • n_rooms (int): The number of rooms. This must be a squared number.
  • n_starting_states (int): The number of possible starting states.
  • optimal_distribution (Union[Tuple, rv_continuous]): The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
  • other_distribution (Union[Tuple, rv_continuous]): The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
  • make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
  • reward_variance_multiplier (float): A constant that can be used to increase the variance of the reward distributions without changing their means. The lower the value, the higher the variance. By default, it is set to 1.
@staticmethod
def get_unique_symbols() -> List[str]:
76    @staticmethod
77    def get_unique_symbols() -> List[str]:
78        return [" ", ">", "<", "v", "^", "G", "W"]
Returns
  • List[str]: the unique symbols of the grid representation of the MDP.
@staticmethod
def does_seed_change_MDP_structure() -> bool:
80    @staticmethod
81    def does_seed_change_MDP_structure() -> bool:
82        return True
Returns
  • bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
  • happen when there are fewer starting states that possible one and the effective starting states are picked
  • randomly based on the seed.
@staticmethod
def sample_mdp_parameters(n: int, is_episodic: bool, seed: int = None) -> List[Dict[str, Any]]:
 84    @staticmethod
 85    def sample_mdp_parameters(
 86        n: int, is_episodic: bool, seed: int = None
 87    ) -> List[Dict[str, Any]]:
 88        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 89        samples = []
 90        for _ in range(n):
 91            p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5])
 92            n_rooms, room_size, _ = rng.dirichlet([0.2, 0.2, 1])
 93            n_rooms = min(9, (2 * n_rooms + 2).astype(int) ** 2)
 94            room_size = min(9, (7.0 * room_size + 3).astype(int))
 95            if is_episodic:
 96                room_size = max(room_size - 3, 3)
 97            sample = dict(
 98                room_size=room_size,
 99                n_rooms=n_rooms,
100                n_starting_states=rng.randint(1, 5),
101                p_rand=p_rand,
102                p_lazy=p_lazy,
103                make_reward_stochastic=rng.choice([True, False]),
104                reward_variance_multiplier=2 * rng.random() + 0.005,
105            )
106            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
107            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
108
109            if sample["make_reward_stochastic"]:
110                size = int(sample["room_size"] * sample["n_rooms"] ** 0.5)
111                sample["optimal_distribution"] = (
112                    "beta",
113                    (
114                        sample["reward_variance_multiplier"],
115                        sample["reward_variance_multiplier"] * (size ** 2 - 1),
116                    ),
117                )
118                sample["other_distribution"] = (
119                    "beta",
120                    (
121                        sample["reward_variance_multiplier"] * (size ** 2 - 1),
122                        sample["reward_variance_multiplier"],
123                    ),
124                )
125            else:
126                sample["optimal_distribution"] = ("deterministic", (1.0,))
127                sample["other_distribution"] = ("deterministic", (0.0,))
128
129            samples.append(rounding_nested_structure(sample))
130        return samples
Returns
  • List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
def get_gin_parameters(self, index: int) -> str:
136    def get_gin_parameters(self, index: int) -> str:
137        prms = dict(
138            room_size=self._room_size,
139            n_rooms=self._n_rooms,
140            n_starting_states=self._n_starting_states,
141            make_reward_stochastic=self._make_reward_stochastic,
142            reward_variance_multiplier=self._reward_variance_multiplier,
143            optimal_distribution=(
144                self._optimal_distribution.dist.name,
145                self._optimal_distribution.args,
146            ),
147            other_distribution=(
148                self._other_distribution.dist.name,
149                self._other_distribution.args,
150            ),
151        )
152
153        if self._p_rand is not None:
154            prms["p_rand"] = self._p_rand
155        if self._p_lazy is not None:
156            prms["p_lazy"] = self._p_lazy
157
158        return MiniGridRoomsMDP.produce_gin_file_from_mdp_parameters(
159            prms, type(self).__name__, index
160        )
Returns
  • str: The gin config of the MDP instance.
n_actions: int
Returns
  • int: The number of available actions.
@staticmethod
def get_positions_coords_in_room(room_size: int, room_coord: Tuple[int, int]) -> <built-in function array>:
187    @staticmethod
188    def get_positions_coords_in_room(
189        room_size: int, room_coord: Tuple[int, int]
190    ) -> np.array:
191        x_room_coord, y_room_coord = room_coord
192        nodes = np.zeros((room_size, room_size), dtype=object)
193        for i in range(room_size):
194            for j in range(room_size):
195                nodes[j, i] = (
196                    i + (room_size + 1) * x_room_coord,
197                    j + (room_size + 1) * y_room_coord,
198                )
199        nodes = nodes[::-1]
200        return nodes
parameters: Dict[str, Any]
Returns
  • Dict[str, Any]: The parameters of the MDP.