colosseum.mdp.minigrid_empty.base
1import abc 2from dataclasses import dataclass 3from enum import IntEnum 4from itertools import product 5from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Type, Union 6 7import numpy as np 8from scipy.stats import beta, rv_continuous 9 10from colosseum.mdp import BaseMDP 11from colosseum.mdp.utils.custom_samplers import NextStateSampler 12from colosseum.utils.miscellanea import ( 13 check_distributions, 14 deterministic, 15 get_dist, 16 rounding_nested_structure, 17) 18 19if TYPE_CHECKING: 20 from colosseum.mdp import ACTION_TYPE, NODE_TYPE 21 22 23class MiniGridEmptyAction(IntEnum): 24 """The action available in the MiniGridEmpty MDP.""" 25 26 MoveForward = 0 27 """Move the agent forward.""" 28 TurnRight = 1 29 """Turn the agent towards the right.""" 30 TurnLeft = 2 31 """Turn the agent towards the left.""" 32 33 34class MiniGridEmptyDirection(IntEnum): 35 """ 36 The actions available in the MiniGridEmpty MDP. 37 """ 38 39 UP = 0 40 RIGHT = 1 41 DOWN = 2 42 LEFT = 3 43 44 45@dataclass(frozen=True) 46class MiniGridEmptyNode: 47 """ 48 The node for the MiniGridEmpty MDP. 49 """ 50 51 X: int 52 """x coordinate.""" 53 Y: int 54 """y coordinate.""" 55 Dir: MiniGridEmptyDirection 56 """The direction the agent is facing.""" 57 58 def __str__(self): 59 return f"X={self.X},Y={self.Y},Dir={self.Dir.name}" 60 61 62class MiniGridEmptyMDP(BaseMDP, abc.ABC): 63 """ 64 The base class for the MiniGridEmpty family. 65 """ 66 67 @staticmethod 68 def get_unique_symbols() -> List[str]: 69 return [" ", ">", "<", "v", "^", "G"] 70 71 @staticmethod 72 def does_seed_change_MDP_structure() -> bool: 73 return True 74 75 @staticmethod 76 def sample_mdp_parameters( 77 n: int, is_episodic: bool, seed: int = None 78 ) -> List[Dict[str, Any]]: 79 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 80 samples = [] 81 for _ in range(n): 82 p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5]) 83 sample = dict( 84 size=int(np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)) 85 if is_episodic 86 else int(1.5 * np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)), 87 n_starting_states=rng.randint(1, 5), 88 p_rand=p_rand, 89 p_lazy=p_lazy, 90 make_reward_stochastic=rng.choice([True, False]), 91 reward_variance_multiplier=2 * rng.random() + 0.005, 92 ) 93 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 94 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 95 96 if sample["make_reward_stochastic"]: 97 sample["optimal_distribution"] = ( 98 "beta", 99 ( 100 sample["reward_variance_multiplier"], 101 sample["reward_variance_multiplier"] 102 * (sample["size"] ** 2 - 1), 103 ), 104 ) 105 sample["other_distribution"] = ( 106 "beta", 107 ( 108 sample["reward_variance_multiplier"] 109 * (sample["size"] ** 2 - 1), 110 sample["reward_variance_multiplier"], 111 ), 112 ) 113 else: 114 sample["optimal_distribution"] = ("deterministic", (1.0,)) 115 sample["other_distribution"] = ("deterministic", (0.0,)) 116 117 samples.append(rounding_nested_structure(sample)) 118 return samples 119 120 @staticmethod 121 def get_node_class() -> Type["NODE_TYPE"]: 122 return MiniGridEmptyNode 123 124 def get_gin_parameters(self, index: int) -> str: 125 prms = dict( 126 size=self._size, 127 n_starting_states=self._n_starting_states, 128 make_reward_stochastic=self._make_reward_stochastic, 129 reward_variance_multiplier=self._reward_variance_multiplier, 130 optimal_distribution=( 131 self._optimal_distribution.dist.name, 132 self._optimal_distribution.args, 133 ), 134 other_distribution=( 135 self._other_distribution.dist.name, 136 self._other_distribution.args, 137 ), 138 ) 139 140 if self._p_rand is not None: 141 prms["p_rand"] = self._p_rand 142 if self._p_lazy is not None: 143 prms["p_lazy"] = self._p_lazy 144 145 return MiniGridEmptyMDP.produce_gin_file_from_mdp_parameters( 146 prms, type(self).__name__, index 147 ) 148 149 @property 150 def n_actions(self) -> int: 151 return len(MiniGridEmptyAction) 152 153 def _get_next_nodes_parameters( 154 self, node: "NODE_TYPE", action: "ACTION_TYPE" 155 ) -> Tuple[Tuple[dict, float], ...]: 156 d = node.Dir 157 if action == MiniGridEmptyAction.TurnRight: 158 return ( 159 ( 160 dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d + 1) % 4)), 161 1.0, 162 ), 163 ) 164 if action == MiniGridEmptyAction.TurnLeft: 165 return ( 166 ( 167 dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d - 1) % 4)), 168 1.0, 169 ), 170 ) 171 if action == MiniGridEmptyAction.MoveForward: 172 if d == MiniGridEmptyDirection.UP: 173 return ( 174 (dict(X=node.X, Y=min(node.Y + 1, self._size - 1), Dir=d), 1.0), 175 ) 176 if d == MiniGridEmptyDirection.RIGHT: 177 return ( 178 (dict(X=min(self._size - 1, node.X + 1), Y=node.Y, Dir=d), 1.0), 179 ) 180 if d == MiniGridEmptyDirection.DOWN: 181 return ((dict(X=node.X, Y=max(node.Y - 1, 0), Dir=d), 1.0),) 182 if d == MiniGridEmptyDirection.LEFT: 183 return ((dict(X=max(0, node.X - 1), Y=node.Y, Dir=d), 1.0),) 184 185 def _get_reward_distribution( 186 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 187 ) -> rv_continuous: 188 return ( 189 self._optimal_distribution 190 if next_node.X == self.goal_position[0] 191 and next_node.Y == self.goal_position[1] 192 else self._other_distribution 193 ) 194 195 def get_positions_on_side(self, side: int) -> List[Tuple[int, int]]: 196 nodes = [] 197 for i in range(self._size): 198 for j in range(self._size): 199 if side == 0: # Starting from the left 200 nodes.append((i, j)) 201 elif side == 1: # Starting from the south 202 nodes.append((j, i)) 203 elif side == 2: # Starting from the right 204 nodes.append((self._size - 1 - i, self._size - 1 - j)) 205 else: # Starting from the north 206 nodes.append((self._size - 1 - j, self._size - 1 - i)) 207 # if len(nodes) == N: 208 # return nodes 209 return nodes 210 211 @property 212 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 213 return [ 214 MiniGridEmptyNode(x, y, MiniGridEmptyDirection(d)) 215 for (x, y), d in product(self.__possible_starting_nodes, range(4)) 216 ] 217 218 def _get_starting_node_sampler(self) -> NextStateSampler: 219 self.side_start = self._rng.randint(4) 220 self.goal_position = self.get_positions_on_side((self.side_start + 2) % 4)[ 221 : self._size 222 ][self._rng.randint(self._size)] 223 self.__possible_starting_nodes = self.get_positions_on_side(self.side_start)[ 224 : self._size 225 ] 226 self._rng.shuffle(self.__possible_starting_nodes) 227 starting_nodes = self.__possible_starting_nodes[: self._n_starting_states] 228 return NextStateSampler( 229 next_nodes=[ 230 MiniGridEmptyNode(x, y, MiniGridEmptyDirection(self._rng.randint(4))) 231 for x, y in starting_nodes 232 ], 233 probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))], 234 seed=self._produce_random_seed(), 235 ) 236 237 def _check_parameters_in_input(self): 238 super(MiniGridEmptyMDP, self)._check_parameters_in_input() 239 240 assert self._size > 2, f"the size should be greater than 2" 241 assert self._n_starting_states > 0 242 243 dists = [ 244 self._optimal_distribution, 245 self._other_distribution, 246 ] 247 check_distributions( 248 dists, 249 self._make_reward_stochastic, 250 ) 251 252 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 253 grid = np.zeros((self._size, self._size), dtype=str) 254 grid[:, :] = " " 255 grid[self.goal_position[1], self.goal_position[0]] = "G" 256 if self.cur_node.Dir == MiniGridEmptyDirection.UP: 257 grid[self.cur_node.Y, self.cur_node.X] = "^" 258 elif self.cur_node.Dir == MiniGridEmptyDirection.RIGHT: 259 grid[self.cur_node.Y, self.cur_node.X] = ">" 260 elif self.cur_node.Dir == MiniGridEmptyDirection.DOWN: 261 grid[self.cur_node.Y, self.cur_node.X] = "v" 262 elif self.cur_node.Dir == MiniGridEmptyDirection.LEFT: 263 grid[self.cur_node.Y, self.cur_node.X] = "<" 264 return grid[::-1, :] 265 266 @property 267 def parameters(self) -> Dict[str, Any]: 268 return { 269 **super(MiniGridEmptyMDP, self).parameters, 270 **dict( 271 size=self._size, 272 n_starting_states=self._n_starting_states, 273 optimal_distribution=self._optimal_distribution, 274 other_distribution=self._other_distribution, 275 ), 276 } 277 278 def __init__( 279 self, 280 seed: int, 281 size: int, 282 n_starting_states: int = 1, 283 optimal_distribution: Union[Tuple, rv_continuous] = None, 284 other_distribution: Union[Tuple, rv_continuous] = None, 285 make_reward_stochastic=False, 286 reward_variance_multiplier: float = 1.0, 287 **kwargs, 288 ): 289 """ 290 Parameters 291 ---------- 292 seed : int 293 The seed used for sampling rewards and next states. 294 size : int 295 The size of the grid. 296 n_starting_states : int 297 The number of possible starting states. 298 optimal_distribution : Union[Tuple, rv_continuous] 299 The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters 300 or as a rv_continuous object. 301 other_distribution : Union[Tuple, rv_continuous] 302 The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a 303 rv_continuous object. 304 make_reward_stochastic : bool 305 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 306 reward_variance_multiplier : float 307 A constant that can be used to increase the variance of the reward distributions without changing their means. 308 The lower the value, the higher the variance. By default, it is set to 1. 309 """ 310 311 if type(optimal_distribution) == tuple: 312 optimal_distribution = get_dist( 313 optimal_distribution[0], optimal_distribution[1] 314 ) 315 if type(other_distribution) == tuple: 316 other_distribution = get_dist(other_distribution[0], other_distribution[1]) 317 318 self._n_starting_states = n_starting_states 319 self._size = size 320 321 dists = [ 322 optimal_distribution, 323 other_distribution, 324 ] 325 if dists.count(None) == 0: 326 self._optimal_distribution = optimal_distribution 327 self._other_distribution = other_distribution 328 else: 329 if make_reward_stochastic: 330 self._other_distribution = beta( 331 reward_variance_multiplier, 332 reward_variance_multiplier * (size ** 2 - 1), 333 ) 334 self._optimal_distribution = beta( 335 reward_variance_multiplier * (size ** 2 - 1), 336 reward_variance_multiplier, 337 ) 338 else: 339 self._optimal_distribution = deterministic(1.0) 340 self._other_distribution = deterministic(0.0) 341 342 super(MiniGridEmptyMDP, self).__init__( 343 seed=seed, 344 reward_variance_multiplier=reward_variance_multiplier, 345 make_reward_stochastic=make_reward_stochastic, 346 **kwargs, 347 )
class
MiniGridEmptyAction(enum.IntEnum):
24class MiniGridEmptyAction(IntEnum): 25 """The action available in the MiniGridEmpty MDP.""" 26 27 MoveForward = 0 28 """Move the agent forward.""" 29 TurnRight = 1 30 """Turn the agent towards the right.""" 31 TurnLeft = 2 32 """Turn the agent towards the left."""
The action available in the MiniGridEmpty MDP.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
class
MiniGridEmptyDirection(enum.IntEnum):
35class MiniGridEmptyDirection(IntEnum): 36 """ 37 The actions available in the MiniGridEmpty MDP. 38 """ 39 40 UP = 0 41 RIGHT = 1 42 DOWN = 2 43 LEFT = 3
The actions available in the MiniGridEmpty MDP.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
@dataclass(frozen=True)
class
MiniGridEmptyNode:
46@dataclass(frozen=True) 47class MiniGridEmptyNode: 48 """ 49 The node for the MiniGridEmpty MDP. 50 """ 51 52 X: int 53 """x coordinate.""" 54 Y: int 55 """y coordinate.""" 56 Dir: MiniGridEmptyDirection 57 """The direction the agent is facing.""" 58 59 def __str__(self): 60 return f"X={self.X},Y={self.Y},Dir={self.Dir.name}"
The node for the MiniGridEmpty MDP.
MiniGridEmptyNode( X: int, Y: int, Dir: colosseum.mdp.minigrid_empty.base.MiniGridEmptyDirection)
63class MiniGridEmptyMDP(BaseMDP, abc.ABC): 64 """ 65 The base class for the MiniGridEmpty family. 66 """ 67 68 @staticmethod 69 def get_unique_symbols() -> List[str]: 70 return [" ", ">", "<", "v", "^", "G"] 71 72 @staticmethod 73 def does_seed_change_MDP_structure() -> bool: 74 return True 75 76 @staticmethod 77 def sample_mdp_parameters( 78 n: int, is_episodic: bool, seed: int = None 79 ) -> List[Dict[str, Any]]: 80 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 81 samples = [] 82 for _ in range(n): 83 p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5]) 84 sample = dict( 85 size=int(np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)) 86 if is_episodic 87 else int(1.5 * np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)), 88 n_starting_states=rng.randint(1, 5), 89 p_rand=p_rand, 90 p_lazy=p_lazy, 91 make_reward_stochastic=rng.choice([True, False]), 92 reward_variance_multiplier=2 * rng.random() + 0.005, 93 ) 94 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 95 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 96 97 if sample["make_reward_stochastic"]: 98 sample["optimal_distribution"] = ( 99 "beta", 100 ( 101 sample["reward_variance_multiplier"], 102 sample["reward_variance_multiplier"] 103 * (sample["size"] ** 2 - 1), 104 ), 105 ) 106 sample["other_distribution"] = ( 107 "beta", 108 ( 109 sample["reward_variance_multiplier"] 110 * (sample["size"] ** 2 - 1), 111 sample["reward_variance_multiplier"], 112 ), 113 ) 114 else: 115 sample["optimal_distribution"] = ("deterministic", (1.0,)) 116 sample["other_distribution"] = ("deterministic", (0.0,)) 117 118 samples.append(rounding_nested_structure(sample)) 119 return samples 120 121 @staticmethod 122 def get_node_class() -> Type["NODE_TYPE"]: 123 return MiniGridEmptyNode 124 125 def get_gin_parameters(self, index: int) -> str: 126 prms = dict( 127 size=self._size, 128 n_starting_states=self._n_starting_states, 129 make_reward_stochastic=self._make_reward_stochastic, 130 reward_variance_multiplier=self._reward_variance_multiplier, 131 optimal_distribution=( 132 self._optimal_distribution.dist.name, 133 self._optimal_distribution.args, 134 ), 135 other_distribution=( 136 self._other_distribution.dist.name, 137 self._other_distribution.args, 138 ), 139 ) 140 141 if self._p_rand is not None: 142 prms["p_rand"] = self._p_rand 143 if self._p_lazy is not None: 144 prms["p_lazy"] = self._p_lazy 145 146 return MiniGridEmptyMDP.produce_gin_file_from_mdp_parameters( 147 prms, type(self).__name__, index 148 ) 149 150 @property 151 def n_actions(self) -> int: 152 return len(MiniGridEmptyAction) 153 154 def _get_next_nodes_parameters( 155 self, node: "NODE_TYPE", action: "ACTION_TYPE" 156 ) -> Tuple[Tuple[dict, float], ...]: 157 d = node.Dir 158 if action == MiniGridEmptyAction.TurnRight: 159 return ( 160 ( 161 dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d + 1) % 4)), 162 1.0, 163 ), 164 ) 165 if action == MiniGridEmptyAction.TurnLeft: 166 return ( 167 ( 168 dict(X=node.X, Y=node.Y, Dir=MiniGridEmptyDirection((d - 1) % 4)), 169 1.0, 170 ), 171 ) 172 if action == MiniGridEmptyAction.MoveForward: 173 if d == MiniGridEmptyDirection.UP: 174 return ( 175 (dict(X=node.X, Y=min(node.Y + 1, self._size - 1), Dir=d), 1.0), 176 ) 177 if d == MiniGridEmptyDirection.RIGHT: 178 return ( 179 (dict(X=min(self._size - 1, node.X + 1), Y=node.Y, Dir=d), 1.0), 180 ) 181 if d == MiniGridEmptyDirection.DOWN: 182 return ((dict(X=node.X, Y=max(node.Y - 1, 0), Dir=d), 1.0),) 183 if d == MiniGridEmptyDirection.LEFT: 184 return ((dict(X=max(0, node.X - 1), Y=node.Y, Dir=d), 1.0),) 185 186 def _get_reward_distribution( 187 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 188 ) -> rv_continuous: 189 return ( 190 self._optimal_distribution 191 if next_node.X == self.goal_position[0] 192 and next_node.Y == self.goal_position[1] 193 else self._other_distribution 194 ) 195 196 def get_positions_on_side(self, side: int) -> List[Tuple[int, int]]: 197 nodes = [] 198 for i in range(self._size): 199 for j in range(self._size): 200 if side == 0: # Starting from the left 201 nodes.append((i, j)) 202 elif side == 1: # Starting from the south 203 nodes.append((j, i)) 204 elif side == 2: # Starting from the right 205 nodes.append((self._size - 1 - i, self._size - 1 - j)) 206 else: # Starting from the north 207 nodes.append((self._size - 1 - j, self._size - 1 - i)) 208 # if len(nodes) == N: 209 # return nodes 210 return nodes 211 212 @property 213 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 214 return [ 215 MiniGridEmptyNode(x, y, MiniGridEmptyDirection(d)) 216 for (x, y), d in product(self.__possible_starting_nodes, range(4)) 217 ] 218 219 def _get_starting_node_sampler(self) -> NextStateSampler: 220 self.side_start = self._rng.randint(4) 221 self.goal_position = self.get_positions_on_side((self.side_start + 2) % 4)[ 222 : self._size 223 ][self._rng.randint(self._size)] 224 self.__possible_starting_nodes = self.get_positions_on_side(self.side_start)[ 225 : self._size 226 ] 227 self._rng.shuffle(self.__possible_starting_nodes) 228 starting_nodes = self.__possible_starting_nodes[: self._n_starting_states] 229 return NextStateSampler( 230 next_nodes=[ 231 MiniGridEmptyNode(x, y, MiniGridEmptyDirection(self._rng.randint(4))) 232 for x, y in starting_nodes 233 ], 234 probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))], 235 seed=self._produce_random_seed(), 236 ) 237 238 def _check_parameters_in_input(self): 239 super(MiniGridEmptyMDP, self)._check_parameters_in_input() 240 241 assert self._size > 2, f"the size should be greater than 2" 242 assert self._n_starting_states > 0 243 244 dists = [ 245 self._optimal_distribution, 246 self._other_distribution, 247 ] 248 check_distributions( 249 dists, 250 self._make_reward_stochastic, 251 ) 252 253 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 254 grid = np.zeros((self._size, self._size), dtype=str) 255 grid[:, :] = " " 256 grid[self.goal_position[1], self.goal_position[0]] = "G" 257 if self.cur_node.Dir == MiniGridEmptyDirection.UP: 258 grid[self.cur_node.Y, self.cur_node.X] = "^" 259 elif self.cur_node.Dir == MiniGridEmptyDirection.RIGHT: 260 grid[self.cur_node.Y, self.cur_node.X] = ">" 261 elif self.cur_node.Dir == MiniGridEmptyDirection.DOWN: 262 grid[self.cur_node.Y, self.cur_node.X] = "v" 263 elif self.cur_node.Dir == MiniGridEmptyDirection.LEFT: 264 grid[self.cur_node.Y, self.cur_node.X] = "<" 265 return grid[::-1, :] 266 267 @property 268 def parameters(self) -> Dict[str, Any]: 269 return { 270 **super(MiniGridEmptyMDP, self).parameters, 271 **dict( 272 size=self._size, 273 n_starting_states=self._n_starting_states, 274 optimal_distribution=self._optimal_distribution, 275 other_distribution=self._other_distribution, 276 ), 277 } 278 279 def __init__( 280 self, 281 seed: int, 282 size: int, 283 n_starting_states: int = 1, 284 optimal_distribution: Union[Tuple, rv_continuous] = None, 285 other_distribution: Union[Tuple, rv_continuous] = None, 286 make_reward_stochastic=False, 287 reward_variance_multiplier: float = 1.0, 288 **kwargs, 289 ): 290 """ 291 Parameters 292 ---------- 293 seed : int 294 The seed used for sampling rewards and next states. 295 size : int 296 The size of the grid. 297 n_starting_states : int 298 The number of possible starting states. 299 optimal_distribution : Union[Tuple, rv_continuous] 300 The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters 301 or as a rv_continuous object. 302 other_distribution : Union[Tuple, rv_continuous] 303 The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a 304 rv_continuous object. 305 make_reward_stochastic : bool 306 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 307 reward_variance_multiplier : float 308 A constant that can be used to increase the variance of the reward distributions without changing their means. 309 The lower the value, the higher the variance. By default, it is set to 1. 310 """ 311 312 if type(optimal_distribution) == tuple: 313 optimal_distribution = get_dist( 314 optimal_distribution[0], optimal_distribution[1] 315 ) 316 if type(other_distribution) == tuple: 317 other_distribution = get_dist(other_distribution[0], other_distribution[1]) 318 319 self._n_starting_states = n_starting_states 320 self._size = size 321 322 dists = [ 323 optimal_distribution, 324 other_distribution, 325 ] 326 if dists.count(None) == 0: 327 self._optimal_distribution = optimal_distribution 328 self._other_distribution = other_distribution 329 else: 330 if make_reward_stochastic: 331 self._other_distribution = beta( 332 reward_variance_multiplier, 333 reward_variance_multiplier * (size ** 2 - 1), 334 ) 335 self._optimal_distribution = beta( 336 reward_variance_multiplier * (size ** 2 - 1), 337 reward_variance_multiplier, 338 ) 339 else: 340 self._optimal_distribution = deterministic(1.0) 341 self._other_distribution = deterministic(0.0) 342 343 super(MiniGridEmptyMDP, self).__init__( 344 seed=seed, 345 reward_variance_multiplier=reward_variance_multiplier, 346 make_reward_stochastic=make_reward_stochastic, 347 **kwargs, 348 )
The base class for the MiniGridEmpty family.
MiniGridEmptyMDP( seed: int, size: int, n_starting_states: int = 1, optimal_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, other_distribution: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, make_reward_stochastic=False, reward_variance_multiplier: float = 1.0, **kwargs)
279 def __init__( 280 self, 281 seed: int, 282 size: int, 283 n_starting_states: int = 1, 284 optimal_distribution: Union[Tuple, rv_continuous] = None, 285 other_distribution: Union[Tuple, rv_continuous] = None, 286 make_reward_stochastic=False, 287 reward_variance_multiplier: float = 1.0, 288 **kwargs, 289 ): 290 """ 291 Parameters 292 ---------- 293 seed : int 294 The seed used for sampling rewards and next states. 295 size : int 296 The size of the grid. 297 n_starting_states : int 298 The number of possible starting states. 299 optimal_distribution : Union[Tuple, rv_continuous] 300 The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters 301 or as a rv_continuous object. 302 other_distribution : Union[Tuple, rv_continuous] 303 The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a 304 rv_continuous object. 305 make_reward_stochastic : bool 306 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 307 reward_variance_multiplier : float 308 A constant that can be used to increase the variance of the reward distributions without changing their means. 309 The lower the value, the higher the variance. By default, it is set to 1. 310 """ 311 312 if type(optimal_distribution) == tuple: 313 optimal_distribution = get_dist( 314 optimal_distribution[0], optimal_distribution[1] 315 ) 316 if type(other_distribution) == tuple: 317 other_distribution = get_dist(other_distribution[0], other_distribution[1]) 318 319 self._n_starting_states = n_starting_states 320 self._size = size 321 322 dists = [ 323 optimal_distribution, 324 other_distribution, 325 ] 326 if dists.count(None) == 0: 327 self._optimal_distribution = optimal_distribution 328 self._other_distribution = other_distribution 329 else: 330 if make_reward_stochastic: 331 self._other_distribution = beta( 332 reward_variance_multiplier, 333 reward_variance_multiplier * (size ** 2 - 1), 334 ) 335 self._optimal_distribution = beta( 336 reward_variance_multiplier * (size ** 2 - 1), 337 reward_variance_multiplier, 338 ) 339 else: 340 self._optimal_distribution = deterministic(1.0) 341 self._other_distribution = deterministic(0.0) 342 343 super(MiniGridEmptyMDP, self).__init__( 344 seed=seed, 345 reward_variance_multiplier=reward_variance_multiplier, 346 make_reward_stochastic=make_reward_stochastic, 347 **kwargs, 348 )
Parameters
- seed (int): The seed used for sampling rewards and next states.
- size (int): The size of the grid.
- n_starting_states (int): The number of possible starting states.
- optimal_distribution (Union[Tuple, rv_continuous]): The distribution of the highly rewarding state. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
- other_distribution (Union[Tuple, rv_continuous]): The distribution of the other states. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
- make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
- reward_variance_multiplier (float): A constant that can be used to increase the variance of the reward distributions without changing their means. The lower the value, the higher the variance. By default, it is set to 1.
@staticmethod
def
get_unique_symbols() -> List[str]:
Returns
- List[str]: the unique symbols of the grid representation of the MDP.
@staticmethod
def
does_seed_change_MDP_structure() -> bool:
Returns
- bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
- happen when there are fewer starting states that possible one and the effective starting states are picked
- randomly based on the seed.
@staticmethod
def
sample_mdp_parameters(n: int, is_episodic: bool, seed: int = None) -> List[Dict[str, Any]]:
76 @staticmethod 77 def sample_mdp_parameters( 78 n: int, is_episodic: bool, seed: int = None 79 ) -> List[Dict[str, Any]]: 80 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 81 samples = [] 82 for _ in range(n): 83 p_rand, p_lazy, _ = 0.9 * rng.dirichlet([0.2, 0.2, 5]) 84 sample = dict( 85 size=int(np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)) 86 if is_episodic 87 else int(1.5 * np.minimum(5 + (14 / (8 * rng.random() + 1.0)), 20)), 88 n_starting_states=rng.randint(1, 5), 89 p_rand=p_rand, 90 p_lazy=p_lazy, 91 make_reward_stochastic=rng.choice([True, False]), 92 reward_variance_multiplier=2 * rng.random() + 0.005, 93 ) 94 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 95 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 96 97 if sample["make_reward_stochastic"]: 98 sample["optimal_distribution"] = ( 99 "beta", 100 ( 101 sample["reward_variance_multiplier"], 102 sample["reward_variance_multiplier"] 103 * (sample["size"] ** 2 - 1), 104 ), 105 ) 106 sample["other_distribution"] = ( 107 "beta", 108 ( 109 sample["reward_variance_multiplier"] 110 * (sample["size"] ** 2 - 1), 111 sample["reward_variance_multiplier"], 112 ), 113 ) 114 else: 115 sample["optimal_distribution"] = ("deterministic", (1.0,)) 116 sample["other_distribution"] = ("deterministic", (0.0,)) 117 118 samples.append(rounding_nested_structure(sample)) 119 return samples
Returns
- List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
@staticmethod
def
get_node_class() -> Type[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]]:
Returns
- Type["NODE_TYPE"]: The class of the nodes of the MDP.
def
get_gin_parameters(self, index: int) -> str:
125 def get_gin_parameters(self, index: int) -> str: 126 prms = dict( 127 size=self._size, 128 n_starting_states=self._n_starting_states, 129 make_reward_stochastic=self._make_reward_stochastic, 130 reward_variance_multiplier=self._reward_variance_multiplier, 131 optimal_distribution=( 132 self._optimal_distribution.dist.name, 133 self._optimal_distribution.args, 134 ), 135 other_distribution=( 136 self._other_distribution.dist.name, 137 self._other_distribution.args, 138 ), 139 ) 140 141 if self._p_rand is not None: 142 prms["p_rand"] = self._p_rand 143 if self._p_lazy is not None: 144 prms["p_lazy"] = self._p_lazy 145 146 return MiniGridEmptyMDP.produce_gin_file_from_mdp_parameters( 147 prms, type(self).__name__, index 148 )
Returns
- str: The gin config of the MDP instance.
def
get_positions_on_side(self, side: int) -> List[Tuple[int, int]]:
196 def get_positions_on_side(self, side: int) -> List[Tuple[int, int]]: 197 nodes = [] 198 for i in range(self._size): 199 for j in range(self._size): 200 if side == 0: # Starting from the left 201 nodes.append((i, j)) 202 elif side == 1: # Starting from the south 203 nodes.append((j, i)) 204 elif side == 2: # Starting from the right 205 nodes.append((self._size - 1 - i, self._size - 1 - j)) 206 else: # Starting from the north 207 nodes.append((self._size - 1 - j, self._size - 1 - i)) 208 # if len(nodes) == N: 209 # return nodes 210 return nodes
Inherited Members
- colosseum.mdp.base.BaseMDP
- get_available_hardness_measures
- produce_gin_file_from_mdp_parameters
- is_episodic
- sample_parameters
- get_grid_representation
- get_gin_config
- get_node_labels
- get_node_action_labels
- hash
- instantiate_MDP
- T
- R
- recurrent_nodes_set
- communication_class
- get_optimal_policy
- get_worst_policy
- get_value_functions
- optimal_value_functions
- worst_value_functions
- random_value_functions
- optimal_transition_probabilities
- worst_transition_probabilities
- random_transition_probabilities
- optimal_markov_chain
- worst_markov_chain
- random_markov_chain
- get_stationary_distribution
- optimal_stationary_distribution
- worst_stationary_distribution
- random_stationary_distribution
- optimal_average_rewards
- worst_average_rewards
- random_average_rewards
- get_average_reward
- optimal_average_reward
- worst_average_reward
- random_average_reward
- transition_matrix_and_rewards
- graph_layout
- graph_metrics
- diameter
- sum_reciprocals_suboptimality_gaps
- discounted_value_norm
- undiscounted_value_norm
- value_norm
- measures_of_hardness
- summary
- hardness_report
- get_info_class
- get_transition_distributions
- get_reward_distribution
- sample_reward
- get_measure_from_name
- action_spec
- observation_spec
- get_observation
- reset
- step
- random_steps
- random_step
- get_visitation_counts
- reset_visitation_counts
- get_value_node_labels
- dm_env._environment.Environment
- reward_spec
- discount_spec
- close