colosseum.mdp.minigrid_rooms.base
1import abc 2from dataclasses import asdict, dataclass 3from enum import IntEnum 4from itertools import product 5from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Type, Union 6 7import numpy as np 8from scipy.stats import beta, rv_continuous 9 10from colosseum.mdp import BaseMDP 11from colosseum.mdp.utils.custom_samplers import NextStateSampler 12from colosseum.utils.miscellanea import ( 13 check_distributions, 14 deterministic, 15 get_dist, 16 rounding_nested_structure, 17) 18 19if TYPE_CHECKING: 20 from colosseum.mdp import ACTION_TYPE, NODE_TYPE 21 22 23class MiniGridRoomsAction(IntEnum): 24 """ 25 The actions available in the MiniGridRooms MDP. 26 """ 27 28 MoveForward = 0 29 TurnRight = 1 30 TurnLeft = 2 31 32 33class MiniGridRoomsDirection(IntEnum): 34 """The possible agent_directions in the MiniGridRooms MDP.""" 35 36 UP = 0 37 RIGHT = 1 38 DOWN = 2 39 LEFT = 3 40 41 def grid_movement(self) -> np.array: 42 """:returns the effect caused by each action in grid space.""" 43 if self == MiniGridRoomsDirection.UP: 44 return np.array((0, 1)) 45 elif self == MiniGridRoomsDirection.DOWN: 46 return np.array((0, -1)) 47 elif self == MiniGridRoomsDirection.RIGHT: 48 return np.array((1, 0)) 49 else: 50 return np.array((-1, 0)) 51 52 53@dataclass(frozen=True) 54class MiniGridRoomsNode: 55 """ 56 The node for the MiniGrid-Rooms MDP. 57 """ 58 59 X: int 60 """x coordinate.""" 61 Y: int 62 """y coordinate.""" 63 Dir: MiniGridRoomsDirection 64 """The direction the agent is facing.""" 65 66 def __str__(self): 67 return f"X={self.X},Y={self.Y},Dir={MiniGridRoomsDirection(self.Dir).name}" 68 69 70class MiniGridRoomsMDP(BaseMDP, abc.ABC): 71 """ 72 The base class for the MiniGrid-Rooms family. 73 """ 74 75 @staticmethod 76 def get_unique_symbols() -> List[str]: 77 return [" ", ">", "<", "v", "^", "G", "W"] 78 79 @staticmethod 80 def does_seed_change_MDP_structure() -> bool: 81 return True 82 83 @staticmethod 84 def sample_mdp_parameters( 85 n: int, is_episodic: bool, seed: int = None 86 ) -> List[Dict[str, Any]]: 87 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 88 samples = [] 89 for _ in range(n): 90 p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5]) 91 n_rooms, room_size, _ = rng.dirichlet([0.2, 0.2, 1]) 92 n_rooms = min(9, (2 * n_rooms + 2).astype(int) ** 2) 93 room_size = min(9, (7.0 * room_size + 3).astype(int)) 94 if is_episodic: 95 room_size = max(room_size - 3, 3) 96 sample = dict( 97 room_size=room_size, 98 n_rooms=n_rooms, 99 n_starting_states=rng.randint(1, 5), 100 p_rand=p_rand, 101 p_lazy=p_lazy, 102 make_reward_stochastic=rng.choice([True, False]), 103 reward_variance_multiplier=2 * rng.random() + 0.005, 104 ) 105 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 106 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 107 108 if sample["make_reward_stochastic"]: 109 size = int(sample["room_size"] * sample["n_rooms"] ** 0.5) 110 sample["optimal_distribution"] = ( 111 "beta", 112 ( 113 sample["reward_variance_multiplier"], 114 sample["reward_variance_multiplier"] * (size ** 2 - 1), 115 ), 116 ) 117 sample["other_distribution"] = ( 118 "beta", 119 ( 120 sample["reward_variance_multiplier"] * (size ** 2 - 1), 121 sample["reward_variance_multiplier"], 122 ), 123 ) 124 else: 125 sample["optimal_distribution"] = ("deterministic", (1.0,)) 126 sample["other_distribution"] = ("deterministic", (0.0,)) 127 128 samples.append(rounding_nested_structure(sample)) 129 return samples 130 131 @staticmethod 132 def get_node_class() -> Type["NODE_TYPE"]: 133 return MiniGridRoomsNode 134 135 def get_gin_parameters(self, index: int) -> str: 136 prms = dict( 137 room_size=self._room_size, 138 n_rooms=self._n_rooms, 139 n_starting_states=self._n_starting_states, 140 make_reward_stochastic=self._make_reward_stochastic, 141 reward_variance_multiplier=self._reward_variance_multiplier, 142 optimal_distribution=( 143 self._optimal_distribution.dist.name, 144 self._optimal_distribution.args, 145 ), 146 other_distribution=( 147 self._other_distribution.dist.name, 148 self._other_distribution.args, 149 ), 150 ) 151 152 if self._p_rand is not None: 153 prms["p_rand"] = self._p_rand 154 if self._p_lazy is not None: 155 prms["p_lazy"] = self._p_lazy 156 157 return MiniGridRoomsMDP.produce_gin_file_from_mdp_parameters( 158 prms, type(self).__name__, index 159 ) 160 161 @property 162 def n_actions(self) -> int: 163 return len(MiniGridRoomsAction) 164 165 @property 166 def _admissible_coordinate(self) -> list: 167 rooms_per_row = int(np.sqrt(self._n_rooms)) 168 169 vertical_checkers = [ 170 j * (self._room_size) + j + int(np.floor(self._room_size / 2)) 171 for j in range(rooms_per_row) 172 ] 173 horizontal_checkers = [ 174 j * self._room_size + j - 1 for j in range(1, rooms_per_row) 175 ] 176 door_positions = list(product(horizontal_checkers, vertical_checkers)) + list( 177 product(vertical_checkers, horizontal_checkers) 178 ) 179 rooms_coordinates = [] 180 for room_coord in product(range(rooms_per_row), range(rooms_per_row)): 181 room = self.get_positions_coords_in_room(self._room_size, room_coord) 182 for c in room.ravel().tolist(): 183 rooms_coordinates.append(tuple(c)) 184 return rooms_coordinates + door_positions 185 186 @staticmethod 187 def get_positions_coords_in_room( 188 room_size: int, room_coord: Tuple[int, int] 189 ) -> np.array: 190 x_room_coord, y_room_coord = room_coord 191 nodes = np.zeros((room_size, room_size), dtype=object) 192 for i in range(room_size): 193 for j in range(room_size): 194 nodes[j, i] = ( 195 i + (room_size + 1) * x_room_coord, 196 j + (room_size + 1) * y_room_coord, 197 ) 198 nodes = nodes[::-1] 199 return nodes 200 201 def _get_next_nodes_parameters( 202 self, node: "NODE_TYPE", action: "ACTION_TYPE" 203 ) -> Tuple[Tuple[dict, float], ...]: 204 d = node.Dir 205 if action == MiniGridRoomsAction.TurnRight: 206 return ((dict(X=node.X, Y=node.Y, Dir=((d + 1) % 4)), 1.0),) 207 if action == MiniGridRoomsAction.TurnLeft: 208 return ((dict(X=node.X, Y=node.Y, Dir=((d - 1) % 4)), 1.0),) 209 if action == MiniGridRoomsAction.MoveForward: 210 if d == MiniGridRoomsDirection.UP: 211 next_coord = (node.X, node.Y + 1) 212 if d == MiniGridRoomsDirection.RIGHT: 213 next_coord = node.X + 1, node.Y 214 if d == MiniGridRoomsDirection.DOWN: 215 next_coord = node.X, node.Y - 1 216 if d == MiniGridRoomsDirection.LEFT: 217 next_coord = node.X - 1, node.Y 218 if next_coord in self._admissible_coordinate: 219 return ((dict(X=next_coord[0], Y=next_coord[1], Dir=d), 1.0),) 220 return ((asdict(node), 1.0),) 221 222 def _get_reward_distribution( 223 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 224 ) -> rv_continuous: 225 return ( 226 self._optimal_distribution 227 if next_node.X == self.goal_position[0] 228 and next_node.Y == self.goal_position[1] 229 else self._other_distribution 230 ) 231 232 def _get_starting_node_sampler(self) -> NextStateSampler: 233 rooms_per_row = int(np.sqrt(self._n_rooms)) 234 rooms = list(product(range(rooms_per_row), range(rooms_per_row))) 235 236 corner_rooms = list(product((0, int(self._n_rooms ** 0.5) - 1), repeat=2)) 237 sr = self._fast_rng.randint(0, len(corner_rooms) - 1) 238 self.starting_room = corner_rooms[sr] 239 corner_rooms.pop(sr) 240 self.goal_room = corner_rooms[self._fast_rng.randint(0, len(corner_rooms) - 1)] 241 assert self.goal_room != self.starting_room 242 243 # Random goal position from a random room 244 goal_positions = ( 245 self.get_positions_coords_in_room(self._room_size, self.goal_room) 246 .ravel() 247 .tolist() 248 ) 249 self._rng.shuffle(goal_positions) 250 self.goal_position = goal_positions[0] 251 252 # Random starting position from a random room 253 starting_nodes = [ 254 MiniGridRoomsNode(x, y, MiniGridRoomsDirection(d)) 255 for x, y in self.get_positions_coords_in_room( 256 self._room_size, self.starting_room 257 ) 258 .ravel() 259 .tolist() 260 for d in range(4) 261 ] 262 self._rng.shuffle(starting_nodes) 263 self.__possible_starting_nodes = starting_nodes 264 265 return NextStateSampler( 266 next_nodes=self._possible_starting_nodes[: self._n_starting_states], 267 probs=[1 / self._n_starting_states for _ in range(self._n_starting_states)], 268 seed=self._produce_random_seed(), 269 ) 270 271 def _check_parameters_in_input(self): 272 super(MiniGridRoomsMDP, self)._check_parameters_in_input() 273 274 assert self._n_rooms >= 4, "There should be at least 4 rooms" 275 assert self._room_size >= 2, "The room size must be at least 2" 276 assert int(np.sqrt(self._n_rooms)) == np.sqrt( 277 self._n_rooms 278 ), "Please provide a number of rooms with perfect square." 279 280 assert self._n_starting_states > 0 281 282 dists = [ 283 self._optimal_distribution, 284 self._other_distribution, 285 ] 286 check_distributions( 287 dists, 288 self._make_reward_stochastic, 289 ) 290 291 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 292 rooms_per_row = int(np.sqrt(self._n_rooms)) 293 door_positions = [ 294 int(self._room_size // 2) + i * (self._room_size + 1) + 1 295 for i in range(rooms_per_row) 296 ] 297 grid_size = rooms_per_row * self._room_size + rooms_per_row - 1 298 grid = np.zeros((grid_size, grid_size), dtype=str) 299 300 for x in range(1, grid_size + 1): 301 for y in range(1, grid_size + 1): 302 if ( 303 x != 0 304 and x != (grid_size) 305 and x % (self._room_size + 1) == 0 306 and not y in door_positions 307 ): 308 grid[y - 1, x - 1] = "W" 309 continue 310 elif ( 311 y != 0 312 and y != (grid_size) 313 and y % (self._room_size + 1) == 0 314 and not x in door_positions 315 ): 316 grid[y - 1, x - 1] = "W" 317 continue 318 else: 319 grid[y - 1, x - 1] = " " 320 321 grid[self.goal_position[1], self.goal_position[0]] = "G" 322 323 if self.cur_node.Dir == MiniGridRoomsDirection.UP: 324 grid[self.cur_node.Y, self.cur_node.X] = "^" 325 elif self.cur_node.Dir == MiniGridRoomsDirection.RIGHT: 326 grid[self.cur_node.Y, self.cur_node.X] = ">" 327 elif self.cur_node.Dir == MiniGridRoomsDirection.DOWN: 328 grid[self.cur_node.Y, self.cur_node.X] = "v" 329 elif self.cur_node.Dir == MiniGridRoomsDirection.LEFT: 330 grid[self.cur_node.Y, self.cur_node.X] = "<" 331 332 return grid[::-1, :] 333 334 @property 335 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 336 return self.__possible_starting_nodes 337 338 @property 339 def parameters(self) -> Dict[str, Any]: 340 return { 341 **super(MiniGridRoomsMDP, self).parameters, 342 **dict( 343 room_size=self._room_size, 344 n_rooms=self._n_rooms, 345 n_starting_states=self._n_starting_states, 346 optimal_distribution=self._optimal_distribution, 347 other_distribution=self._other_distribution, 348 ), 349 } 350 351 def __init__( 352 self, 353 seed: int, 354 room_size: int, 355 n_rooms: int = 4, 356 n_starting_states: int = 2, 357 optimal_distribution: Union[Tuple, rv_continuous] = None, 358 other_distribution: Union[Tuple, rv_continuous] = None, 359 make_reward_stochastic=False, 360 reward_variance_multiplier: float = 1.0, 361 **kwargs, 362 ): 363 """ 364 365 Parameters 366 ---------- 367 seed : int 368 The seed used for sampling rewards and next states. 369 room_size : int 370 The size of the roorms. 371 n_rooms : int 372 The number of rooms. This must be a squared number. 373 n_starting_states : int 374 The number of possible starting states. 375 optimal_distribution : Union[Tuple, rv_continuous] 376 The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters 377 or as a rv_continuous object. 378 other_distribution : Union[Tuple, rv_continuous] 379 The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a 380 rv_continuous object. 381 make_reward_stochastic : bool 382 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 383 reward_variance_multiplier : float 384 A constant that can be used to increase the variance of the reward distributions without changing their means. 385 The lower the value, the higher the variance. By default, it is set to 1. 386 """ 387 388 if type(optimal_distribution) == tuple: 389 optimal_distribution = get_dist( 390 optimal_distribution[0], optimal_distribution[1] 391 ) 392 if type(other_distribution) == tuple: 393 other_distribution = get_dist(other_distribution[0], other_distribution[1]) 394 395 self._n_starting_states = n_starting_states 396 self._room_size = room_size 397 self._n_rooms = n_rooms 398 399 dists = [ 400 optimal_distribution, 401 other_distribution, 402 ] 403 if dists.count(None) == 0: 404 self._optimal_distribution = optimal_distribution 405 self._other_distribution = other_distribution 406 else: 407 if make_reward_stochastic: 408 size = int(room_size * n_rooms ** 0.5) 409 self._other_distribution = beta( 410 reward_variance_multiplier, 411 reward_variance_multiplier * (size ** 2 - 1), 412 ) 413 self._optimal_distribution = beta( 414 reward_variance_multiplier * (size ** 2 - 1), 415 reward_variance_multiplier, 416 ) 417 else: 418 self._optimal_distribution = deterministic(1.0) 419 self._other_distribution = deterministic(0.0) 420 421 super(MiniGridRoomsMDP, self).__init__( 422 seed=seed, 423 reward_variance_multiplier=reward_variance_multiplier, 424 make_reward_stochastic=make_reward_stochastic, 425 **kwargs, 426 )
class
MiniGridRoomsAction(enum.IntEnum):
24class MiniGridRoomsAction(IntEnum): 25 """ 26 The actions available in the MiniGridRooms MDP. 27 """ 28 29 MoveForward = 0 30 TurnRight = 1 31 TurnLeft = 2
The actions available in the MiniGridRooms MDP.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
class
MiniGridRoomsDirection(enum.IntEnum):
34class MiniGridRoomsDirection(IntEnum): 35 """The possible agent_directions in the MiniGridRooms MDP.""" 36 37 UP = 0 38 RIGHT = 1 39 DOWN = 2 40 LEFT = 3 41 42 def grid_movement(self) -> np.array: 43 """:returns the effect caused by each action in grid space.""" 44 if self == MiniGridRoomsDirection.UP: 45 return np.array((0, 1)) 46 elif self == MiniGridRoomsDirection.DOWN: 47 return np.array((0, -1)) 48 elif self == MiniGridRoomsDirection.RIGHT: 49 return np.array((1, 0)) 50 else: 51 return np.array((-1, 0))
The possible agent_directions in the MiniGridRooms MDP.
def
grid_movement(self) -> <built-in function array>:
42 def grid_movement(self) -> np.array: 43 """:returns the effect caused by each action in grid space.""" 44 if self == MiniGridRoomsDirection.UP: 45 return np.array((0, 1)) 46 elif self == MiniGridRoomsDirection.DOWN: 47 return np.array((0, -1)) 48 elif self == MiniGridRoomsDirection.RIGHT: 49 return np.array((1, 0)) 50 else: 51 return np.array((-1, 0))
:returns the effect caused by each action in grid space.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
@dataclass(frozen=True)
class
MiniGridRoomsNode:
54@dataclass(frozen=True) 55class MiniGridRoomsNode: 56 """ 57 The node for the MiniGrid-Rooms MDP. 58 """ 59 60 X: int 61 """x coordinate.""" 62 Y: int 63 """y coordinate.""" 64 Dir: MiniGridRoomsDirection 65 """The direction the agent is facing.""" 66 67 def __str__(self): 68 return f"X={self.X},Y={self.Y},Dir={MiniGridRoomsDirection(self.Dir).name}"
The node for the MiniGrid-Rooms MDP.
MiniGridRoomsNode( X: int, Y: int, Dir: colosseum.mdp.minigrid_rooms.base.MiniGridRoomsDirection)
71class MiniGridRoomsMDP(BaseMDP, abc.ABC): 72 """ 73 The base class for the MiniGrid-Rooms family. 74 """ 75 76 @staticmethod 77 def get_unique_symbols() -> List[str]: 78 return [" ", ">", "<", "v", "^", "G", "W"] 79 80 @staticmethod 81 def does_seed_change_MDP_structure() -> bool: 82 return True 83 84 @staticmethod 85 def sample_mdp_parameters( 86 n: int, is_episodic: bool, seed: int = None 87 ) -> List[Dict[str, Any]]: 88 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 89 samples = [] 90 for _ in range(n): 91 p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5]) 92 n_rooms, room_size, _ = rng.dirichlet([0.2, 0.2, 1]) 93 n_rooms = min(9, (2 * n_rooms + 2).astype(int) ** 2) 94 room_size = min(9, (7.0 * room_size + 3).astype(int)) 95 if is_episodic: 96 room_size = max(room_size - 3, 3) 97 sample = dict( 98 room_size=room_size, 99 n_rooms=n_rooms, 100 n_starting_states=rng.randint(1, 5), 101 p_rand=p_rand, 102 p_lazy=p_lazy, 103 make_reward_stochastic=rng.choice([True, False]), 104 reward_variance_multiplier=2 * rng.random() + 0.005, 105 ) 106 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 107 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 108 109 if sample["make_reward_stochastic"]: 110 size = int(sample["room_size"] * sample["n_rooms"] ** 0.5) 111 sample["optimal_distribution"] = ( 112 "beta", 113 ( 114 sample["reward_variance_multiplier"], 115 sample["reward_variance_multiplier"] * (size ** 2 - 1), 116 ), 117 ) 118 sample["other_distribution"] = ( 119 "beta", 120 ( 121 sample["reward_variance_multiplier"] * (size ** 2 - 1), 122 sample["reward_variance_multiplier"], 123 ), 124 ) 125 else: 126 sample["optimal_distribution"] = ("deterministic", (1.0,)) 127 sample["other_distribution"] = ("deterministic", (0.0,)) 128 129 samples.append(rounding_nested_structure(sample)) 130 return samples 131 132 @staticmethod 133 def get_node_class() -> Type["NODE_TYPE"]: 134 return MiniGridRoomsNode 135 136 def get_gin_parameters(self, index: int) -> str: 137 prms = dict( 138 room_size=self._room_size, 139 n_rooms=self._n_rooms, 140 n_starting_states=self._n_starting_states, 141 make_reward_stochastic=self._make_reward_stochastic, 142 reward_variance_multiplier=self._reward_variance_multiplier, 143 optimal_distribution=( 144 self._optimal_distribution.dist.name, 145 self._optimal_distribution.args, 146 ), 147 other_distribution=( 148 self._other_distribution.dist.name, 149 self._other_distribution.args, 150 ), 151 ) 152 153 if self._p_rand is not None: 154 prms["p_rand"] = self._p_rand 155 if self._p_lazy is not None: 156 prms["p_lazy"] = self._p_lazy 157 158 return MiniGridRoomsMDP.produce_gin_file_from_mdp_parameters( 159 prms, type(self).__name__, index 160 ) 161 162 @property 163 def n_actions(self) -> int: 164 return len(MiniGridRoomsAction) 165 166 @property 167 def _admissible_coordinate(self) -> list: 168 rooms_per_row = int(np.sqrt(self._n_rooms)) 169 170 vertical_checkers = [ 171 j * (self._room_size) + j + int(np.floor(self._room_size / 2)) 172 for j in range(rooms_per_row) 173 ] 174 horizontal_checkers = [ 175 j * self._room_size + j - 1 for j in range(1, rooms_per_row) 176 ] 177 door_positions = list(product(horizontal_checkers, vertical_checkers)) + list( 178 product(vertical_checkers, horizontal_checkers) 179 ) 180 rooms_coordinates = [] 181 for room_coord in product(range(rooms_per_row), range(rooms_per_row)): 182 room = self.get_positions_coords_in_room(self._room_size, room_coord) 183 for c in room.ravel().tolist(): 184 rooms_coordinates.append(tuple(c)) 185 return rooms_coordinates + door_positions 186 187 @staticmethod 188 def get_positions_coords_in_room( 189 room_size: int, room_coord: Tuple[int, int] 190 ) -> np.array: 191 x_room_coord, y_room_coord = room_coord 192 nodes = np.zeros((room_size, room_size), dtype=object) 193 for i in range(room_size): 194 for j in range(room_size): 195 nodes[j, i] = ( 196 i + (room_size + 1) * x_room_coord, 197 j + (room_size + 1) * y_room_coord, 198 ) 199 nodes = nodes[::-1] 200 return nodes 201 202 def _get_next_nodes_parameters( 203 self, node: "NODE_TYPE", action: "ACTION_TYPE" 204 ) -> Tuple[Tuple[dict, float], ...]: 205 d = node.Dir 206 if action == MiniGridRoomsAction.TurnRight: 207 return ((dict(X=node.X, Y=node.Y, Dir=((d + 1) % 4)), 1.0),) 208 if action == MiniGridRoomsAction.TurnLeft: 209 return ((dict(X=node.X, Y=node.Y, Dir=((d - 1) % 4)), 1.0),) 210 if action == MiniGridRoomsAction.MoveForward: 211 if d == MiniGridRoomsDirection.UP: 212 next_coord = (node.X, node.Y + 1) 213 if d == MiniGridRoomsDirection.RIGHT: 214 next_coord = node.X + 1, node.Y 215 if d == MiniGridRoomsDirection.DOWN: 216 next_coord = node.X, node.Y - 1 217 if d == MiniGridRoomsDirection.LEFT: 218 next_coord = node.X - 1, node.Y 219 if next_coord in self._admissible_coordinate: 220 return ((dict(X=next_coord[0], Y=next_coord[1], Dir=d), 1.0),) 221 return ((asdict(node), 1.0),) 222 223 def _get_reward_distribution( 224 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 225 ) -> rv_continuous: 226 return ( 227 self._optimal_distribution 228 if next_node.X == self.goal_position[0] 229 and next_node.Y == self.goal_position[1] 230 else self._other_distribution 231 ) 232 233 def _get_starting_node_sampler(self) -> NextStateSampler: 234 rooms_per_row = int(np.sqrt(self._n_rooms)) 235 rooms = list(product(range(rooms_per_row), range(rooms_per_row))) 236 237 corner_rooms = list(product((0, int(self._n_rooms ** 0.5) - 1), repeat=2)) 238 sr = self._fast_rng.randint(0, len(corner_rooms) - 1) 239 self.starting_room = corner_rooms[sr] 240 corner_rooms.pop(sr) 241 self.goal_room = corner_rooms[self._fast_rng.randint(0, len(corner_rooms) - 1)] 242 assert self.goal_room != self.starting_room 243 244 # Random goal position from a random room 245 goal_positions = ( 246 self.get_positions_coords_in_room(self._room_size, self.goal_room) 247 .ravel() 248 .tolist() 249 ) 250 self._rng.shuffle(goal_positions) 251 self.goal_position = goal_positions[0] 252 253 # Random starting position from a random room 254 starting_nodes = [ 255 MiniGridRoomsNode(x, y, MiniGridRoomsDirection(d)) 256 for x, y in self.get_positions_coords_in_room( 257 self._room_size, self.starting_room 258 ) 259 .ravel() 260 .tolist() 261 for d in range(4) 262 ] 263 self._rng.shuffle(starting_nodes) 264 self.__possible_starting_nodes = starting_nodes 265 266 return NextStateSampler( 267 next_nodes=self._possible_starting_nodes[: self._n_starting_states], 268 probs=[1 / self._n_starting_states for _ in range(self._n_starting_states)], 269 seed=self._produce_random_seed(), 270 ) 271 272 def _check_parameters_in_input(self): 273 super(MiniGridRoomsMDP, self)._check_parameters_in_input() 274 275 assert self._n_rooms >= 4, "There should be at least 4 rooms" 276 assert self._room_size >= 2, "The room size must be at least 2" 277 assert int(np.sqrt(self._n_rooms)) == np.sqrt( 278 self._n_rooms 279 ), "Please provide a number of rooms with perfect square." 280 281 assert self._n_starting_states > 0 282 283 dists = [ 284 self._optimal_distribution, 285 self._other_distribution, 286 ] 287 check_distributions( 288 dists, 289 self._make_reward_stochastic, 290 ) 291 292 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 293 rooms_per_row = int(np.sqrt(self._n_rooms)) 294 door_positions = [ 295 int(self._room_size // 2) + i * (self._room_size + 1) + 1 296 for i in range(rooms_per_row) 297 ] 298 grid_size = rooms_per_row * self._room_size + rooms_per_row - 1 299 grid = np.zeros((grid_size, grid_size), dtype=str) 300 301 for x in range(1, grid_size + 1): 302 for y in range(1, grid_size + 1): 303 if ( 304 x != 0 305 and x != (grid_size) 306 and x % (self._room_size + 1) == 0 307 and not y in door_positions 308 ): 309 grid[y - 1, x - 1] = "W" 310 continue 311 elif ( 312 y != 0 313 and y != (grid_size) 314 and y % (self._room_size + 1) == 0 315 and not x in door_positions 316 ): 317 grid[y - 1, x - 1] = "W" 318 continue 319 else: 320 grid[y - 1, x - 1] = " " 321 322 grid[self.goal_position[1], self.goal_position[0]] = "G" 323 324 if self.cur_node.Dir == MiniGridRoomsDirection.UP: 325 grid[self.cur_node.Y, self.cur_node.X] = "^" 326 elif self.cur_node.Dir == MiniGridRoomsDirection.RIGHT: 327 grid[self.cur_node.Y, self.cur_node.X] = ">" 328 elif self.cur_node.Dir == MiniGridRoomsDirection.DOWN: 329 grid[self.cur_node.Y, self.cur_node.X] = "v" 330 elif self.cur_node.Dir == MiniGridRoomsDirection.LEFT: 331 grid[self.cur_node.Y, self.cur_node.X] = "<" 332 333 return grid[::-1, :] 334 335 @property 336 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 337 return self.__possible_starting_nodes 338 339 @property 340 def parameters(self) -> Dict[str, Any]: 341 return { 342 **super(MiniGridRoomsMDP, self).parameters, 343 **dict( 344 room_size=self._room_size, 345 n_rooms=self._n_rooms, 346 n_starting_states=self._n_starting_states, 347 optimal_distribution=self._optimal_distribution, 348 other_distribution=self._other_distribution, 349 ), 350 } 351 352 def __init__( 353 self, 354 seed: int, 355 room_size: int, 356 n_rooms: int = 4, 357 n_starting_states: int = 2, 358 optimal_distribution: Union[Tuple, rv_continuous] = None, 359 other_distribution: Union[Tuple, rv_continuous] = None, 360 make_reward_stochastic=False, 361 reward_variance_multiplier: float = 1.0, 362 **kwargs, 363 ): 364 """ 365 366 Parameters 367 ---------- 368 seed : int 369 The seed used for sampling rewards and next states. 370 room_size : int 371 The size of the roorms. 372 n_rooms : int 373 The number of rooms. This must be a squared number. 374 n_starting_states : int 375 The number of possible starting states. 376 optimal_distribution : Union[Tuple, rv_continuous] 377 The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters 378 or as a rv_continuous object. 379 other_distribution : Union[Tuple, rv_continuous] 380 The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a 381 rv_continuous object. 382 make_reward_stochastic : bool 383 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 384 reward_variance_multiplier : float 385 A constant that can be used to increase the variance of the reward distributions without changing their means. 386 The lower the value, the higher the variance. By default, it is set to 1. 387 """ 388 389 if type(optimal_distribution) == tuple: 390 optimal_distribution = get_dist( 391 optimal_distribution[0], optimal_distribution[1] 392 ) 393 if type(other_distribution) == tuple: 394 other_distribution = get_dist(other_distribution[0], other_distribution[1]) 395 396 self._n_starting_states = n_starting_states 397 self._room_size = room_size 398 self._n_rooms = n_rooms 399 400 dists = [ 401 optimal_distribution, 402 other_distribution, 403 ] 404 if dists.count(None) == 0: 405 self._optimal_distribution = optimal_distribution 406 self._other_distribution = other_distribution 407 else: 408 if make_reward_stochastic: 409 size = int(room_size * n_rooms ** 0.5) 410 self._other_distribution = beta( 411 reward_variance_multiplier, 412 reward_variance_multiplier * (size ** 2 - 1), 413 ) 414 self._optimal_distribution = beta( 415 reward_variance_multiplier * (size ** 2 - 1), 416 reward_variance_multiplier, 417 ) 418 else: 419 self._optimal_distribution = deterministic(1.0) 420 self._other_distribution = deterministic(0.0) 421 422 super(MiniGridRoomsMDP, self).__init__( 423 seed=seed, 424 reward_variance_multiplier=reward_variance_multiplier, 425 make_reward_stochastic=make_reward_stochastic, 426 **kwargs, 427 )
The base class for the MiniGrid-Rooms family.
MiniGridRoomsMDP( seed: int, room_size: int, n_rooms: int = 4, n_starting_states: int = 2, optimal_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, other_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, make_reward_stochastic=False, reward_variance_multiplier: float = 1.0, **kwargs)
352 def __init__( 353 self, 354 seed: int, 355 room_size: int, 356 n_rooms: int = 4, 357 n_starting_states: int = 2, 358 optimal_distribution: Union[Tuple, rv_continuous] = None, 359 other_distribution: Union[Tuple, rv_continuous] = None, 360 make_reward_stochastic=False, 361 reward_variance_multiplier: float = 1.0, 362 **kwargs, 363 ): 364 """ 365 366 Parameters 367 ---------- 368 seed : int 369 The seed used for sampling rewards and next states. 370 room_size : int 371 The size of the roorms. 372 n_rooms : int 373 The number of rooms. This must be a squared number. 374 n_starting_states : int 375 The number of possible starting states. 376 optimal_distribution : Union[Tuple, rv_continuous] 377 The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters 378 or as a rv_continuous object. 379 other_distribution : Union[Tuple, rv_continuous] 380 The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a 381 rv_continuous object. 382 make_reward_stochastic : bool 383 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 384 reward_variance_multiplier : float 385 A constant that can be used to increase the variance of the reward distributions without changing their means. 386 The lower the value, the higher the variance. By default, it is set to 1. 387 """ 388 389 if type(optimal_distribution) == tuple: 390 optimal_distribution = get_dist( 391 optimal_distribution[0], optimal_distribution[1] 392 ) 393 if type(other_distribution) == tuple: 394 other_distribution = get_dist(other_distribution[0], other_distribution[1]) 395 396 self._n_starting_states = n_starting_states 397 self._room_size = room_size 398 self._n_rooms = n_rooms 399 400 dists = [ 401 optimal_distribution, 402 other_distribution, 403 ] 404 if dists.count(None) == 0: 405 self._optimal_distribution = optimal_distribution 406 self._other_distribution = other_distribution 407 else: 408 if make_reward_stochastic: 409 size = int(room_size * n_rooms ** 0.5) 410 self._other_distribution = beta( 411 reward_variance_multiplier, 412 reward_variance_multiplier * (size ** 2 - 1), 413 ) 414 self._optimal_distribution = beta( 415 reward_variance_multiplier * (size ** 2 - 1), 416 reward_variance_multiplier, 417 ) 418 else: 419 self._optimal_distribution = deterministic(1.0) 420 self._other_distribution = deterministic(0.0) 421 422 super(MiniGridRoomsMDP, self).__init__( 423 seed=seed, 424 reward_variance_multiplier=reward_variance_multiplier, 425 make_reward_stochastic=make_reward_stochastic, 426 **kwargs, 427 )
Parameters
- seed (int): The seed used for sampling rewards and next states.
- room_size (int): The size of the roorms.
- n_rooms (int): The number of rooms. This must be a squared number.
- n_starting_states (int): The number of possible starting states.
- optimal_distribution (Union[Tuple, rv_continuous]): The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
- other_distribution (Union[Tuple, rv_continuous]): The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
- make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
- reward_variance_multiplier (float): A constant that can be used to increase the variance of the reward distributions without changing their means. The lower the value, the higher the variance. By default, it is set to 1.
@staticmethod
def
get_unique_symbols() -> List[str]:
76 @staticmethod 77 def get_unique_symbols() -> List[str]: 78 return [" ", ">", "<", "v", "^", "G", "W"]
Returns
- List[str]: the unique symbols of the grid representation of the MDP.
@staticmethod
def
does_seed_change_MDP_structure() -> bool:
Returns
- bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
- happen when there are fewer starting states that possible one and the effective starting states are picked
- randomly based on the seed.
@staticmethod
def
sample_mdp_parameters(n: int, is_episodic: bool, seed: int = None) -> List[Dict[str, Any]]:
84 @staticmethod 85 def sample_mdp_parameters( 86 n: int, is_episodic: bool, seed: int = None 87 ) -> List[Dict[str, Any]]: 88 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 89 samples = [] 90 for _ in range(n): 91 p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5]) 92 n_rooms, room_size, _ = rng.dirichlet([0.2, 0.2, 1]) 93 n_rooms = min(9, (2 * n_rooms + 2).astype(int) ** 2) 94 room_size = min(9, (7.0 * room_size + 3).astype(int)) 95 if is_episodic: 96 room_size = max(room_size - 3, 3) 97 sample = dict( 98 room_size=room_size, 99 n_rooms=n_rooms, 100 n_starting_states=rng.randint(1, 5), 101 p_rand=p_rand, 102 p_lazy=p_lazy, 103 make_reward_stochastic=rng.choice([True, False]), 104 reward_variance_multiplier=2 * rng.random() + 0.005, 105 ) 106 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 107 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 108 109 if sample["make_reward_stochastic"]: 110 size = int(sample["room_size"] * sample["n_rooms"] ** 0.5) 111 sample["optimal_distribution"] = ( 112 "beta", 113 ( 114 sample["reward_variance_multiplier"], 115 sample["reward_variance_multiplier"] * (size ** 2 - 1), 116 ), 117 ) 118 sample["other_distribution"] = ( 119 "beta", 120 ( 121 sample["reward_variance_multiplier"] * (size ** 2 - 1), 122 sample["reward_variance_multiplier"], 123 ), 124 ) 125 else: 126 sample["optimal_distribution"] = ("deterministic", (1.0,)) 127 sample["other_distribution"] = ("deterministic", (0.0,)) 128 129 samples.append(rounding_nested_structure(sample)) 130 return samples
Returns
- List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
@staticmethod
def
get_node_class() -> Type[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]]:
Returns
- Type["NODE_TYPE"]: The class of the nodes of the MDP.
def
get_gin_parameters(self, index: int) -> str:
136 def get_gin_parameters(self, index: int) -> str: 137 prms = dict( 138 room_size=self._room_size, 139 n_rooms=self._n_rooms, 140 n_starting_states=self._n_starting_states, 141 make_reward_stochastic=self._make_reward_stochastic, 142 reward_variance_multiplier=self._reward_variance_multiplier, 143 optimal_distribution=( 144 self._optimal_distribution.dist.name, 145 self._optimal_distribution.args, 146 ), 147 other_distribution=( 148 self._other_distribution.dist.name, 149 self._other_distribution.args, 150 ), 151 ) 152 153 if self._p_rand is not None: 154 prms["p_rand"] = self._p_rand 155 if self._p_lazy is not None: 156 prms["p_lazy"] = self._p_lazy 157 158 return MiniGridRoomsMDP.produce_gin_file_from_mdp_parameters( 159 prms, type(self).__name__, index 160 )
Returns
- str: The gin config of the MDP instance.
@staticmethod
def
get_positions_coords_in_room(room_size: int, room_coord: Tuple[int, int]) -> <built-in function array>:
187 @staticmethod 188 def get_positions_coords_in_room( 189 room_size: int, room_coord: Tuple[int, int] 190 ) -> np.array: 191 x_room_coord, y_room_coord = room_coord 192 nodes = np.zeros((room_size, room_size), dtype=object) 193 for i in range(room_size): 194 for j in range(room_size): 195 nodes[j, i] = ( 196 i + (room_size + 1) * x_room_coord, 197 j + (room_size + 1) * y_room_coord, 198 ) 199 nodes = nodes[::-1] 200 return nodes
Inherited Members
- colosseum.mdp.base.BaseMDP
- get_available_hardness_measures
- produce_gin_file_from_mdp_parameters
- is_episodic
- sample_parameters
- get_grid_representation
- get_gin_config
- get_node_labels
- get_node_action_labels
- hash
- instantiate_MDP
- T
- R
- recurrent_nodes_set
- communication_class
- get_optimal_policy
- get_worst_policy
- get_value_functions
- optimal_value_functions
- worst_value_functions
- random_value_functions
- optimal_transition_probabilities
- worst_transition_probabilities
- random_transition_probabilities
- optimal_markov_chain
- worst_markov_chain
- random_markov_chain
- get_stationary_distribution
- optimal_stationary_distribution
- worst_stationary_distribution
- random_stationary_distribution
- optimal_average_rewards
- worst_average_rewards
- random_average_rewards
- get_average_reward
- optimal_average_reward
- worst_average_reward
- random_average_reward
- transition_matrix_and_rewards
- graph_layout
- graph_metrics
- diameter
- sum_reciprocals_suboptimality_gaps
- discounted_value_norm
- undiscounted_value_norm
- value_norm
- measures_of_hardness
- summary
- hardness_report
- get_info_class
- get_transition_distributions
- get_reward_distribution
- sample_reward
- get_measure_from_name
- action_spec
- observation_spec
- get_observation
- reset
- step
- random_steps
- random_step
- get_visitation_counts
- reset_visitation_counts
- get_value_node_labels
- dm_env._environment.Environment
- reward_spec
- discount_spec
- close