colosseum.mdp.taxi.base
1import abc 2from copy import deepcopy 3from dataclasses import asdict, dataclass 4from enum import IntEnum 5from itertools import product 6from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Type, Union 7 8import numpy as np 9from scipy.stats import beta, rv_continuous 10 11from colosseum.mdp import BaseMDP 12from colosseum.mdp.utils.custom_samplers import NextStateSampler 13from colosseum.utils.miscellanea import ( 14 check_distributions, 15 deterministic, 16 get_dist, 17 rounding_nested_structure, 18) 19 20if TYPE_CHECKING: 21 from colosseum.mdp import ACTION_TYPE, NODE_TYPE 22 23 24class TaxiAction(IntEnum): 25 """ 26 The actions available in the Taxi MDP. 27 """ 28 29 MoveSouth = 0 30 MoveNorth = 1 31 MoveEast = 2 32 MoveWest = 3 33 PickUpPassenger = 4 34 DropOffPassenger = 5 35 36 37@dataclass(frozen=True) 38class TaxiNode: 39 """ 40 The node for the Taxi MDP. 41 """ 42 43 X: int 44 """x coordinate of the taxi.""" 45 Y: int 46 """y coordinate of the taxi.""" 47 XPass: int 48 """x coordinate of the passenger, -1 if it is on board.""" 49 YPass: int 50 """y coordinate of the taxi, -1 if it is on board.""" 51 XDest: int 52 """x coordinate of the destination.""" 53 YDest: int 54 """y coordinate of the destination.""" 55 56 def __str__(self): 57 return f"X={self.X},Y={self.Y},XPass={self.XPass},YPass={self.YPass},XDest={self.XDest},YDest={self.YDest}" 58 59 60class TaxiMDP(BaseMDP, abc.ABC): 61 """ 62 The base class for the Taxi family. 63 """ 64 65 @staticmethod 66 def get_unique_symbols() -> List[str]: 67 return [" ", "A", "X", "D", "P"] 68 69 @staticmethod 70 def does_seed_change_MDP_structure() -> bool: 71 return True 72 73 @staticmethod 74 def sample_mdp_parameters( 75 n: int, is_episodic: bool, seed: int = None 76 ) -> List[Dict[str, Any]]: 77 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 78 samples = [] 79 for _ in range(n): 80 p_rand, p_lazy, _ = 0.5 * rng.dirichlet([0.2, 0.2, 5]) 81 sample = dict( 82 size=5 83 if is_episodic 84 else rng.choice(range(5, 8), None, True, [0.525, 0.325, 0.15]), 85 p_rand=p_rand * (0.8 if is_episodic else 1), 86 p_lazy=p_lazy * (0.8 if is_episodic else 1), 87 make_reward_stochastic=rng.choice([True, False]), 88 reward_variance_multiplier=2 * rng.random() + 0.005, 89 ) 90 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 91 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 92 93 if sample["make_reward_stochastic"]: 94 sample["default_r"] = ( 95 "beta", 96 ( 97 sample["reward_variance_multiplier"], 98 sample["reward_variance_multiplier"] * (1 / 0.2 - 1), 99 ), 100 ) 101 sample["successfully_delivery_r"] = ( 102 "beta", 103 ( 104 sample["reward_variance_multiplier"], 105 sample["reward_variance_multiplier"] * (1 / 0.9 - 1), 106 ), 107 ) 108 sample["failure_delivery_r"] = ( 109 "beta", 110 ( 111 sample["reward_variance_multiplier"], 112 sample["reward_variance_multiplier"] * (10 / 0.2 - 1), 113 ), 114 ) 115 else: 116 sample["default_r"] = ("deterministic", (0.1,)) 117 sample["successfully_delivery_r"] = ("deterministic", (1.0,)) 118 sample["failure_delivery_r"] = ("deterministic", (0.0,)) 119 120 samples.append(rounding_nested_structure(sample)) 121 return samples 122 123 @property 124 def _quadrant_width(self): 125 return self._size / int(self._n_locations ** 0.5) / 2 126 127 @property 128 def _admissible_coordinate(self): 129 rows = [] 130 j = 0 131 while len(rows) < self._size: 132 if j % 2 != 0: 133 row = [] 134 else: 135 row = [0] * int((self._width + self._space) // 2) 136 i = 0 137 while len(row) < self._size: 138 row.append(int(i % (1 + self._space) == 0)) 139 if row[-1] == 1: 140 for _ in range(self._width - 1): 141 if len(row) == self._size: 142 break 143 row.append(1) 144 i += 1 145 for _ in range(self._length): 146 if len(rows) == self._size: 147 break 148 rows.append(row) 149 if len(rows) < self._size: 150 rows.append([0] * self._size) 151 j += 1 152 return np.vstack(np.where(np.array(rows) == 0)).T.tolist() 153 154 @property 155 def _quadrants(self): 156 quadrants = np.zeros((self._size, self._size)) 157 split = np.array_split(range(self._size), int(self._n_locations ** 0.5)) 158 for i, (x, y) in enumerate(product(split, split)): 159 for q_coo_x, q_coo_y in product(x, y): 160 quadrants[q_coo_x, q_coo_y] = i 161 quadrants = [ 162 list( 163 filter( 164 lambda x: x in self._admissible_coordinate, 165 np.vstack(np.where(quadrants == i)).T.tolist(), 166 ) 167 ) 168 for i in range(self._n_locations) 169 ] 170 171 assert all(len(q) != 0 for q in quadrants) 172 return quadrants 173 174 @property 175 def locations(self): 176 if len(self._locations) == 0: 177 re_sample = True 178 min_distance = max(self._quadrant_width, 2) 179 while re_sample: 180 locations = [ 181 self._quadrants[i][self._rng.randint(len(self._quadrants[i]))] 182 for i in range(self._n_locations) 183 ] 184 re_sample = False 185 nplocations = np.array(locations) 186 for i in range(self._n_locations): 187 for j in range(1 + i, self._n_locations): 188 diff = np.sqrt(((nplocations[i] - nplocations[j]) ** 2).sum()) 189 if diff <= min_distance: 190 re_sample = True 191 break 192 if re_sample: 193 break 194 self._rng.shuffle(locations) 195 self._locations = locations[: self.n_locations] 196 return self._locations 197 198 @staticmethod 199 def get_node_class() -> Type["NODE_TYPE"]: 200 return TaxiNode 201 202 def get_gin_parameters(self, index: int) -> str: 203 prms = dict( 204 size=self._size, 205 make_reward_stochastic=self._make_reward_stochastic, 206 reward_variance_multiplier=self._reward_variance_multiplier, 207 default_r=( 208 self._default_r.dist.name, 209 self._default_r.args, 210 ), 211 successfully_delivery_r=( 212 self._successfully_delivery_r.dist.name, 213 self._successfully_delivery_r.args, 214 ), 215 failure_delivery_r=( 216 self._failure_delivery_r.dist.name, 217 self._failure_delivery_r.args, 218 ), 219 ) 220 if self._p_rand is not None: 221 prms["p_rand"] = self._p_rand 222 223 return TaxiMDP.produce_gin_file_from_mdp_parameters( 224 prms, type(self).__name__, index 225 ) 226 227 @property 228 def n_actions(self) -> int: 229 return len(TaxiAction) 230 231 def _get_next_nodes_parameters( 232 self, node: "NODE_TYPE", action: "ACTION_TYPE" 233 ) -> Tuple[Tuple[dict, float], ...]: 234 next_node_prms = asdict(node) 235 236 if action == TaxiAction.DropOffPassenger: 237 # we have the passenger and we are dropping time(er/im) in the right place 238 if node.XPass == -1 and node.X == node.XDest and node.Y == node.YDest: 239 next_nodes_prms = [] 240 241 n = 0 242 for pass_loc in filter( 243 lambda loc: loc != [node.X, node.Y], 244 self.locations, 245 ): 246 n += len(list(filter(lambda loc: loc != pass_loc, self.locations))) 247 p = 1.0 / n 248 249 for pass_loc in filter( 250 lambda loc: loc != [node.X, node.Y], 251 self.locations, 252 ): 253 admissible_destinations = list( 254 filter(lambda loc: loc != pass_loc, self.locations) 255 ) 256 257 for destination in admissible_destinations: 258 cur_next_node_prms: dict = deepcopy(next_node_prms) 259 ( 260 cur_next_node_prms["XPass"], 261 cur_next_node_prms["YPass"], 262 ) = pass_loc 263 ( 264 cur_next_node_prms["XDest"], 265 cur_next_node_prms["YDest"], 266 ) = destination 267 next_nodes_prms.append((cur_next_node_prms, p)) 268 return tuple(next_nodes_prms) 269 270 if action == TaxiAction.PickUpPassenger: 271 if node.XPass != -1 and node.X == node.XPass and node.Y == node.YPass: 272 next_node_prms["XPass"] = -1 273 next_node_prms["YPass"] = -1 274 275 if action == TaxiAction.MoveNorth: 276 next_coord = [node.X, node.Y + 1] 277 elif action == TaxiAction.MoveEast: 278 next_coord = [node.X + 1, node.Y] 279 elif action == TaxiAction.MoveSouth: 280 next_coord = [node.X, node.Y - 1] 281 elif action == TaxiAction.MoveWest: 282 next_coord = [node.X - 1, node.Y] 283 else: 284 next_coord = [node.X, node.Y] 285 if next_coord in self._admissible_coordinate: 286 next_node_prms["X"] = next_coord[0] 287 next_node_prms["Y"] = next_coord[1] 288 289 return ((next_node_prms, 1.0),) 290 291 def _get_reward_distribution( 292 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 293 ) -> rv_continuous: 294 if action == TaxiAction.PickUpPassenger: 295 if next_node.XPass != -1 or node.XPass == -1: 296 # We don't have the passenger 297 return self._failure_delivery_r 298 if action == TaxiAction.DropOffPassenger: 299 if next_node.XPass == -1 or node.XPass != -1: 300 # We didn't drop the passenger in the destination 301 return self._failure_delivery_r 302 elif node.XPass == -1 and next_node.XPass != -1: 303 return self._successfully_delivery_r 304 return self._default_r 305 306 def _get_starting_node_sampler(self) -> NextStateSampler: 307 starting_nodes = [] 308 for ( 309 (pass_loc_x, pass_loc_y), 310 (destination_x, destination_y), 311 (taxi_x, taxi_y), 312 ) in product(self.locations, self.locations, self._admissible_coordinate): 313 if (pass_loc_x, pass_loc_y) == (destination_x, destination_y): 314 continue 315 316 starting_nodes.append( 317 TaxiNode( 318 taxi_x, taxi_y, pass_loc_x, pass_loc_y, destination_x, destination_y 319 ) 320 ) 321 self._rng.shuffle(starting_nodes) 322 323 return NextStateSampler( 324 next_nodes=starting_nodes, 325 probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))], 326 seed=self._produce_random_seed(), 327 ) 328 329 def _check_parameters_in_input(self): 330 super(TaxiMDP, self)._check_parameters_in_input() 331 332 assert ( 333 self._failure_delivery_r.mean() 334 < self._default_r.mean() 335 < self._successfully_delivery_r.mean() 336 ) 337 assert self._size > 3 338 assert self.n_locations > (1 if self.is_episodic() else 2) 339 assert self._size > self._length 340 assert self._size > self._width 341 assert self._size > self._space / 2 342 assert self._size > 2 * self.n_locations ** 0.5 343 assert self._optimal_mean_reward - 0.1 > self._sub_optimal_mean_reward 344 345 dists = [ 346 self._default_r, 347 self._failure_delivery_r, 348 self._successfully_delivery_r, 349 ] 350 check_distributions( 351 dists, 352 self._make_reward_stochastic, 353 ) 354 355 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 356 grid = np.zeros((self._size, self._size), dtype=str) 357 grid[:, :] = "X" 358 for coo_x, coo_y in self._admissible_coordinate: 359 grid[coo_x, coo_y] = " " 360 361 grid[node.XDest, node.YDest] = "D" 362 if node.XPass != -1: 363 grid[node.XPass, node.YPass] = "P" 364 grid[node.X, node.Y] = "A" 365 return grid[::-1, :] 366 367 @property 368 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 369 return self._starting_node_sampler.next_states 370 371 @property 372 def parameters(self) -> Dict[str, Any]: 373 return { 374 **super(TaxiMDP, self).parameters, 375 **dict( 376 size=self._size, 377 length=self._length, 378 width=self._width, 379 space=self._space, 380 n_locations=self._n_locations, 381 optimal_mean_reward=self._optimal_mean_reward, 382 sub_optimal_mean_reward=self._sub_optimal_mean_reward, 383 default_r=self._default_r, 384 successfully_delivery_r=self._successfully_delivery_r, 385 failure_delivery_r=self._failure_delivery_r, 386 ), 387 } 388 389 def __init__( 390 self, 391 seed: int, 392 size: int, 393 length=2, 394 width=1, 395 space=1, 396 n_locations=2 ** 2, 397 optimal_mean_reward: float = 0.9, 398 sub_optimal_mean_reward: float = 0.2, 399 default_r: Union[Tuple, rv_continuous] = None, 400 successfully_delivery_r: Union[Tuple, rv_continuous] = None, 401 failure_delivery_r: Union[Tuple, rv_continuous] = None, 402 make_reward_stochastic=False, 403 reward_variance_multiplier: float = 1.0, 404 **kwargs, 405 ): 406 """ 407 Parameters 408 ---------- 409 seed : int 410 The seed used for sampling rewards and next states. 411 size : int 412 The size of the grid. 413 length : int 414 The length of the walls. 415 width : int 416 The width of the walls. 417 space : int 418 The space between walls. 419 n_locations : int 420 The number of possible spawn locations. It must be a squared number. 421 optimal_mean_reward : float 422 If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory. 423 By default, it is set to 0.9. 424 sub_optimal_mean_reward: float 425 If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories. 426 By default, it is set to 0.1. 427 default_r 428 successfully_delivery_r : Union[Tuple, rv_continuous] 429 The reward distribution for successfully delivering a passenger. It can be either passed as a tuple 430 containing Beta parameters or as a rv_continuous object. 431 failure_delivery_r 432 The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing 433 Beta parameters or as a rv_continuous object. 434 make_reward_stochastic : bool 435 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 436 reward_variance_multiplier : float 437 A constant that can be used to increase the variance of the reward distributions without changing their means. 438 The lower the value, the higher the variance. By default, it is set to 1. 439 """ 440 441 if type(successfully_delivery_r) == tuple: 442 successfully_delivery_r = get_dist( 443 successfully_delivery_r[0], successfully_delivery_r[1] 444 ) 445 if type(failure_delivery_r) == tuple: 446 failure_delivery_r = get_dist(failure_delivery_r[0], failure_delivery_r[1]) 447 448 if type(default_r) == tuple: 449 default_r = get_dist(default_r[0], default_r[1]) 450 451 self._size = size 452 self._length = length 453 self._width = width 454 self._space = space 455 self.n_locations = n_locations 456 self._n_locations = int(np.ceil(n_locations ** 0.5) ** 2) 457 self._optimal_mean_reward = optimal_mean_reward 458 self._sub_optimal_mean_reward = sub_optimal_mean_reward 459 self._locations = [] 460 461 dists = [default_r, successfully_delivery_r, failure_delivery_r] 462 if dists.count(None) == 0: 463 self._default_r = default_r 464 self._successfully_delivery_r = successfully_delivery_r 465 self._failure_delivery_r = failure_delivery_r 466 else: 467 if make_reward_stochastic: 468 self._default_r = beta( 469 reward_variance_multiplier, 470 reward_variance_multiplier * (1 / sub_optimal_mean_reward - 1), 471 ) 472 self._successfully_delivery_r = beta( 473 reward_variance_multiplier, 474 reward_variance_multiplier * (1 / optimal_mean_reward - 1), 475 ) 476 self._failure_delivery_r = beta( 477 reward_variance_multiplier, 478 reward_variance_multiplier * (10 / sub_optimal_mean_reward - 1), 479 ) 480 else: 481 self._default_r = deterministic(0.1) 482 self._successfully_delivery_r = deterministic(1) 483 self._failure_delivery_r = deterministic(0) 484 485 kwargs[ 486 "randomize_actions" 487 ] = False # TODO : double check whether this is actually necessary or not 488 489 super(TaxiMDP, self).__init__( 490 seed=seed, 491 reward_variance_multiplier=reward_variance_multiplier, 492 make_reward_stochastic=make_reward_stochastic, 493 **kwargs, 494 )
class
TaxiAction(enum.IntEnum):
25class TaxiAction(IntEnum): 26 """ 27 The actions available in the Taxi MDP. 28 """ 29 30 MoveSouth = 0 31 MoveNorth = 1 32 MoveEast = 2 33 MoveWest = 3 34 PickUpPassenger = 4 35 DropOffPassenger = 5
The actions available in the Taxi MDP.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
@dataclass(frozen=True)
class
TaxiNode:
38@dataclass(frozen=True) 39class TaxiNode: 40 """ 41 The node for the Taxi MDP. 42 """ 43 44 X: int 45 """x coordinate of the taxi.""" 46 Y: int 47 """y coordinate of the taxi.""" 48 XPass: int 49 """x coordinate of the passenger, -1 if it is on board.""" 50 YPass: int 51 """y coordinate of the taxi, -1 if it is on board.""" 52 XDest: int 53 """x coordinate of the destination.""" 54 YDest: int 55 """y coordinate of the destination.""" 56 57 def __str__(self): 58 return f"X={self.X},Y={self.Y},XPass={self.XPass},YPass={self.YPass},XDest={self.XDest},YDest={self.YDest}"
The node for the Taxi MDP.
61class TaxiMDP(BaseMDP, abc.ABC): 62 """ 63 The base class for the Taxi family. 64 """ 65 66 @staticmethod 67 def get_unique_symbols() -> List[str]: 68 return [" ", "A", "X", "D", "P"] 69 70 @staticmethod 71 def does_seed_change_MDP_structure() -> bool: 72 return True 73 74 @staticmethod 75 def sample_mdp_parameters( 76 n: int, is_episodic: bool, seed: int = None 77 ) -> List[Dict[str, Any]]: 78 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 79 samples = [] 80 for _ in range(n): 81 p_rand, p_lazy, _ = 0.5 * rng.dirichlet([0.2, 0.2, 5]) 82 sample = dict( 83 size=5 84 if is_episodic 85 else rng.choice(range(5, 8), None, True, [0.525, 0.325, 0.15]), 86 p_rand=p_rand * (0.8 if is_episodic else 1), 87 p_lazy=p_lazy * (0.8 if is_episodic else 1), 88 make_reward_stochastic=rng.choice([True, False]), 89 reward_variance_multiplier=2 * rng.random() + 0.005, 90 ) 91 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 92 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 93 94 if sample["make_reward_stochastic"]: 95 sample["default_r"] = ( 96 "beta", 97 ( 98 sample["reward_variance_multiplier"], 99 sample["reward_variance_multiplier"] * (1 / 0.2 - 1), 100 ), 101 ) 102 sample["successfully_delivery_r"] = ( 103 "beta", 104 ( 105 sample["reward_variance_multiplier"], 106 sample["reward_variance_multiplier"] * (1 / 0.9 - 1), 107 ), 108 ) 109 sample["failure_delivery_r"] = ( 110 "beta", 111 ( 112 sample["reward_variance_multiplier"], 113 sample["reward_variance_multiplier"] * (10 / 0.2 - 1), 114 ), 115 ) 116 else: 117 sample["default_r"] = ("deterministic", (0.1,)) 118 sample["successfully_delivery_r"] = ("deterministic", (1.0,)) 119 sample["failure_delivery_r"] = ("deterministic", (0.0,)) 120 121 samples.append(rounding_nested_structure(sample)) 122 return samples 123 124 @property 125 def _quadrant_width(self): 126 return self._size / int(self._n_locations ** 0.5) / 2 127 128 @property 129 def _admissible_coordinate(self): 130 rows = [] 131 j = 0 132 while len(rows) < self._size: 133 if j % 2 != 0: 134 row = [] 135 else: 136 row = [0] * int((self._width + self._space) // 2) 137 i = 0 138 while len(row) < self._size: 139 row.append(int(i % (1 + self._space) == 0)) 140 if row[-1] == 1: 141 for _ in range(self._width - 1): 142 if len(row) == self._size: 143 break 144 row.append(1) 145 i += 1 146 for _ in range(self._length): 147 if len(rows) == self._size: 148 break 149 rows.append(row) 150 if len(rows) < self._size: 151 rows.append([0] * self._size) 152 j += 1 153 return np.vstack(np.where(np.array(rows) == 0)).T.tolist() 154 155 @property 156 def _quadrants(self): 157 quadrants = np.zeros((self._size, self._size)) 158 split = np.array_split(range(self._size), int(self._n_locations ** 0.5)) 159 for i, (x, y) in enumerate(product(split, split)): 160 for q_coo_x, q_coo_y in product(x, y): 161 quadrants[q_coo_x, q_coo_y] = i 162 quadrants = [ 163 list( 164 filter( 165 lambda x: x in self._admissible_coordinate, 166 np.vstack(np.where(quadrants == i)).T.tolist(), 167 ) 168 ) 169 for i in range(self._n_locations) 170 ] 171 172 assert all(len(q) != 0 for q in quadrants) 173 return quadrants 174 175 @property 176 def locations(self): 177 if len(self._locations) == 0: 178 re_sample = True 179 min_distance = max(self._quadrant_width, 2) 180 while re_sample: 181 locations = [ 182 self._quadrants[i][self._rng.randint(len(self._quadrants[i]))] 183 for i in range(self._n_locations) 184 ] 185 re_sample = False 186 nplocations = np.array(locations) 187 for i in range(self._n_locations): 188 for j in range(1 + i, self._n_locations): 189 diff = np.sqrt(((nplocations[i] - nplocations[j]) ** 2).sum()) 190 if diff <= min_distance: 191 re_sample = True 192 break 193 if re_sample: 194 break 195 self._rng.shuffle(locations) 196 self._locations = locations[: self.n_locations] 197 return self._locations 198 199 @staticmethod 200 def get_node_class() -> Type["NODE_TYPE"]: 201 return TaxiNode 202 203 def get_gin_parameters(self, index: int) -> str: 204 prms = dict( 205 size=self._size, 206 make_reward_stochastic=self._make_reward_stochastic, 207 reward_variance_multiplier=self._reward_variance_multiplier, 208 default_r=( 209 self._default_r.dist.name, 210 self._default_r.args, 211 ), 212 successfully_delivery_r=( 213 self._successfully_delivery_r.dist.name, 214 self._successfully_delivery_r.args, 215 ), 216 failure_delivery_r=( 217 self._failure_delivery_r.dist.name, 218 self._failure_delivery_r.args, 219 ), 220 ) 221 if self._p_rand is not None: 222 prms["p_rand"] = self._p_rand 223 224 return TaxiMDP.produce_gin_file_from_mdp_parameters( 225 prms, type(self).__name__, index 226 ) 227 228 @property 229 def n_actions(self) -> int: 230 return len(TaxiAction) 231 232 def _get_next_nodes_parameters( 233 self, node: "NODE_TYPE", action: "ACTION_TYPE" 234 ) -> Tuple[Tuple[dict, float], ...]: 235 next_node_prms = asdict(node) 236 237 if action == TaxiAction.DropOffPassenger: 238 # we have the passenger and we are dropping time(er/im) in the right place 239 if node.XPass == -1 and node.X == node.XDest and node.Y == node.YDest: 240 next_nodes_prms = [] 241 242 n = 0 243 for pass_loc in filter( 244 lambda loc: loc != [node.X, node.Y], 245 self.locations, 246 ): 247 n += len(list(filter(lambda loc: loc != pass_loc, self.locations))) 248 p = 1.0 / n 249 250 for pass_loc in filter( 251 lambda loc: loc != [node.X, node.Y], 252 self.locations, 253 ): 254 admissible_destinations = list( 255 filter(lambda loc: loc != pass_loc, self.locations) 256 ) 257 258 for destination in admissible_destinations: 259 cur_next_node_prms: dict = deepcopy(next_node_prms) 260 ( 261 cur_next_node_prms["XPass"], 262 cur_next_node_prms["YPass"], 263 ) = pass_loc 264 ( 265 cur_next_node_prms["XDest"], 266 cur_next_node_prms["YDest"], 267 ) = destination 268 next_nodes_prms.append((cur_next_node_prms, p)) 269 return tuple(next_nodes_prms) 270 271 if action == TaxiAction.PickUpPassenger: 272 if node.XPass != -1 and node.X == node.XPass and node.Y == node.YPass: 273 next_node_prms["XPass"] = -1 274 next_node_prms["YPass"] = -1 275 276 if action == TaxiAction.MoveNorth: 277 next_coord = [node.X, node.Y + 1] 278 elif action == TaxiAction.MoveEast: 279 next_coord = [node.X + 1, node.Y] 280 elif action == TaxiAction.MoveSouth: 281 next_coord = [node.X, node.Y - 1] 282 elif action == TaxiAction.MoveWest: 283 next_coord = [node.X - 1, node.Y] 284 else: 285 next_coord = [node.X, node.Y] 286 if next_coord in self._admissible_coordinate: 287 next_node_prms["X"] = next_coord[0] 288 next_node_prms["Y"] = next_coord[1] 289 290 return ((next_node_prms, 1.0),) 291 292 def _get_reward_distribution( 293 self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE" 294 ) -> rv_continuous: 295 if action == TaxiAction.PickUpPassenger: 296 if next_node.XPass != -1 or node.XPass == -1: 297 # We don't have the passenger 298 return self._failure_delivery_r 299 if action == TaxiAction.DropOffPassenger: 300 if next_node.XPass == -1 or node.XPass != -1: 301 # We didn't drop the passenger in the destination 302 return self._failure_delivery_r 303 elif node.XPass == -1 and next_node.XPass != -1: 304 return self._successfully_delivery_r 305 return self._default_r 306 307 def _get_starting_node_sampler(self) -> NextStateSampler: 308 starting_nodes = [] 309 for ( 310 (pass_loc_x, pass_loc_y), 311 (destination_x, destination_y), 312 (taxi_x, taxi_y), 313 ) in product(self.locations, self.locations, self._admissible_coordinate): 314 if (pass_loc_x, pass_loc_y) == (destination_x, destination_y): 315 continue 316 317 starting_nodes.append( 318 TaxiNode( 319 taxi_x, taxi_y, pass_loc_x, pass_loc_y, destination_x, destination_y 320 ) 321 ) 322 self._rng.shuffle(starting_nodes) 323 324 return NextStateSampler( 325 next_nodes=starting_nodes, 326 probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))], 327 seed=self._produce_random_seed(), 328 ) 329 330 def _check_parameters_in_input(self): 331 super(TaxiMDP, self)._check_parameters_in_input() 332 333 assert ( 334 self._failure_delivery_r.mean() 335 < self._default_r.mean() 336 < self._successfully_delivery_r.mean() 337 ) 338 assert self._size > 3 339 assert self.n_locations > (1 if self.is_episodic() else 2) 340 assert self._size > self._length 341 assert self._size > self._width 342 assert self._size > self._space / 2 343 assert self._size > 2 * self.n_locations ** 0.5 344 assert self._optimal_mean_reward - 0.1 > self._sub_optimal_mean_reward 345 346 dists = [ 347 self._default_r, 348 self._failure_delivery_r, 349 self._successfully_delivery_r, 350 ] 351 check_distributions( 352 dists, 353 self._make_reward_stochastic, 354 ) 355 356 def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray: 357 grid = np.zeros((self._size, self._size), dtype=str) 358 grid[:, :] = "X" 359 for coo_x, coo_y in self._admissible_coordinate: 360 grid[coo_x, coo_y] = " " 361 362 grid[node.XDest, node.YDest] = "D" 363 if node.XPass != -1: 364 grid[node.XPass, node.YPass] = "P" 365 grid[node.X, node.Y] = "A" 366 return grid[::-1, :] 367 368 @property 369 def _possible_starting_nodes(self) -> List["NODE_TYPE"]: 370 return self._starting_node_sampler.next_states 371 372 @property 373 def parameters(self) -> Dict[str, Any]: 374 return { 375 **super(TaxiMDP, self).parameters, 376 **dict( 377 size=self._size, 378 length=self._length, 379 width=self._width, 380 space=self._space, 381 n_locations=self._n_locations, 382 optimal_mean_reward=self._optimal_mean_reward, 383 sub_optimal_mean_reward=self._sub_optimal_mean_reward, 384 default_r=self._default_r, 385 successfully_delivery_r=self._successfully_delivery_r, 386 failure_delivery_r=self._failure_delivery_r, 387 ), 388 } 389 390 def __init__( 391 self, 392 seed: int, 393 size: int, 394 length=2, 395 width=1, 396 space=1, 397 n_locations=2 ** 2, 398 optimal_mean_reward: float = 0.9, 399 sub_optimal_mean_reward: float = 0.2, 400 default_r: Union[Tuple, rv_continuous] = None, 401 successfully_delivery_r: Union[Tuple, rv_continuous] = None, 402 failure_delivery_r: Union[Tuple, rv_continuous] = None, 403 make_reward_stochastic=False, 404 reward_variance_multiplier: float = 1.0, 405 **kwargs, 406 ): 407 """ 408 Parameters 409 ---------- 410 seed : int 411 The seed used for sampling rewards and next states. 412 size : int 413 The size of the grid. 414 length : int 415 The length of the walls. 416 width : int 417 The width of the walls. 418 space : int 419 The space between walls. 420 n_locations : int 421 The number of possible spawn locations. It must be a squared number. 422 optimal_mean_reward : float 423 If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory. 424 By default, it is set to 0.9. 425 sub_optimal_mean_reward: float 426 If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories. 427 By default, it is set to 0.1. 428 default_r 429 successfully_delivery_r : Union[Tuple, rv_continuous] 430 The reward distribution for successfully delivering a passenger. It can be either passed as a tuple 431 containing Beta parameters or as a rv_continuous object. 432 failure_delivery_r 433 The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing 434 Beta parameters or as a rv_continuous object. 435 make_reward_stochastic : bool 436 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 437 reward_variance_multiplier : float 438 A constant that can be used to increase the variance of the reward distributions without changing their means. 439 The lower the value, the higher the variance. By default, it is set to 1. 440 """ 441 442 if type(successfully_delivery_r) == tuple: 443 successfully_delivery_r = get_dist( 444 successfully_delivery_r[0], successfully_delivery_r[1] 445 ) 446 if type(failure_delivery_r) == tuple: 447 failure_delivery_r = get_dist(failure_delivery_r[0], failure_delivery_r[1]) 448 449 if type(default_r) == tuple: 450 default_r = get_dist(default_r[0], default_r[1]) 451 452 self._size = size 453 self._length = length 454 self._width = width 455 self._space = space 456 self.n_locations = n_locations 457 self._n_locations = int(np.ceil(n_locations ** 0.5) ** 2) 458 self._optimal_mean_reward = optimal_mean_reward 459 self._sub_optimal_mean_reward = sub_optimal_mean_reward 460 self._locations = [] 461 462 dists = [default_r, successfully_delivery_r, failure_delivery_r] 463 if dists.count(None) == 0: 464 self._default_r = default_r 465 self._successfully_delivery_r = successfully_delivery_r 466 self._failure_delivery_r = failure_delivery_r 467 else: 468 if make_reward_stochastic: 469 self._default_r = beta( 470 reward_variance_multiplier, 471 reward_variance_multiplier * (1 / sub_optimal_mean_reward - 1), 472 ) 473 self._successfully_delivery_r = beta( 474 reward_variance_multiplier, 475 reward_variance_multiplier * (1 / optimal_mean_reward - 1), 476 ) 477 self._failure_delivery_r = beta( 478 reward_variance_multiplier, 479 reward_variance_multiplier * (10 / sub_optimal_mean_reward - 1), 480 ) 481 else: 482 self._default_r = deterministic(0.1) 483 self._successfully_delivery_r = deterministic(1) 484 self._failure_delivery_r = deterministic(0) 485 486 kwargs[ 487 "randomize_actions" 488 ] = False # TODO : double check whether this is actually necessary or not 489 490 super(TaxiMDP, self).__init__( 491 seed=seed, 492 reward_variance_multiplier=reward_variance_multiplier, 493 make_reward_stochastic=make_reward_stochastic, 494 **kwargs, 495 )
The base class for the Taxi family.
TaxiMDP( seed: int, size: int, length=2, width=1, space=1, n_locations=4, optimal_mean_reward: float = 0.9, sub_optimal_mean_reward: float = 0.2, default_r: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, successfully_delivery_r: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, failure_delivery_r: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, make_reward_stochastic=False, reward_variance_multiplier: float = 1.0, **kwargs)
390 def __init__( 391 self, 392 seed: int, 393 size: int, 394 length=2, 395 width=1, 396 space=1, 397 n_locations=2 ** 2, 398 optimal_mean_reward: float = 0.9, 399 sub_optimal_mean_reward: float = 0.2, 400 default_r: Union[Tuple, rv_continuous] = None, 401 successfully_delivery_r: Union[Tuple, rv_continuous] = None, 402 failure_delivery_r: Union[Tuple, rv_continuous] = None, 403 make_reward_stochastic=False, 404 reward_variance_multiplier: float = 1.0, 405 **kwargs, 406 ): 407 """ 408 Parameters 409 ---------- 410 seed : int 411 The seed used for sampling rewards and next states. 412 size : int 413 The size of the grid. 414 length : int 415 The length of the walls. 416 width : int 417 The width of the walls. 418 space : int 419 The space between walls. 420 n_locations : int 421 The number of possible spawn locations. It must be a squared number. 422 optimal_mean_reward : float 423 If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory. 424 By default, it is set to 0.9. 425 sub_optimal_mean_reward: float 426 If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories. 427 By default, it is set to 0.1. 428 default_r 429 successfully_delivery_r : Union[Tuple, rv_continuous] 430 The reward distribution for successfully delivering a passenger. It can be either passed as a tuple 431 containing Beta parameters or as a rv_continuous object. 432 failure_delivery_r 433 The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing 434 Beta parameters or as a rv_continuous object. 435 make_reward_stochastic : bool 436 If True, the rewards of the MDP will be stochastic. By default, it is set to False. 437 reward_variance_multiplier : float 438 A constant that can be used to increase the variance of the reward distributions without changing their means. 439 The lower the value, the higher the variance. By default, it is set to 1. 440 """ 441 442 if type(successfully_delivery_r) == tuple: 443 successfully_delivery_r = get_dist( 444 successfully_delivery_r[0], successfully_delivery_r[1] 445 ) 446 if type(failure_delivery_r) == tuple: 447 failure_delivery_r = get_dist(failure_delivery_r[0], failure_delivery_r[1]) 448 449 if type(default_r) == tuple: 450 default_r = get_dist(default_r[0], default_r[1]) 451 452 self._size = size 453 self._length = length 454 self._width = width 455 self._space = space 456 self.n_locations = n_locations 457 self._n_locations = int(np.ceil(n_locations ** 0.5) ** 2) 458 self._optimal_mean_reward = optimal_mean_reward 459 self._sub_optimal_mean_reward = sub_optimal_mean_reward 460 self._locations = [] 461 462 dists = [default_r, successfully_delivery_r, failure_delivery_r] 463 if dists.count(None) == 0: 464 self._default_r = default_r 465 self._successfully_delivery_r = successfully_delivery_r 466 self._failure_delivery_r = failure_delivery_r 467 else: 468 if make_reward_stochastic: 469 self._default_r = beta( 470 reward_variance_multiplier, 471 reward_variance_multiplier * (1 / sub_optimal_mean_reward - 1), 472 ) 473 self._successfully_delivery_r = beta( 474 reward_variance_multiplier, 475 reward_variance_multiplier * (1 / optimal_mean_reward - 1), 476 ) 477 self._failure_delivery_r = beta( 478 reward_variance_multiplier, 479 reward_variance_multiplier * (10 / sub_optimal_mean_reward - 1), 480 ) 481 else: 482 self._default_r = deterministic(0.1) 483 self._successfully_delivery_r = deterministic(1) 484 self._failure_delivery_r = deterministic(0) 485 486 kwargs[ 487 "randomize_actions" 488 ] = False # TODO : double check whether this is actually necessary or not 489 490 super(TaxiMDP, self).__init__( 491 seed=seed, 492 reward_variance_multiplier=reward_variance_multiplier, 493 make_reward_stochastic=make_reward_stochastic, 494 **kwargs, 495 )
Parameters
- seed (int): The seed used for sampling rewards and next states.
- size (int): The size of the grid.
- length (int): The length of the walls.
- width (int): The width of the walls.
- space (int): The space between walls.
- n_locations (int): The number of possible spawn locations. It must be a squared number.
- optimal_mean_reward (float): If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory. By default, it is set to 0.9.
- sub_optimal_mean_reward (float): If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories. By default, it is set to 0.1.
- default_r
- successfully_delivery_r (Union[Tuple, rv_continuous]): The reward distribution for successfully delivering a passenger. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
- failure_delivery_r: The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
- make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
- reward_variance_multiplier (float): A constant that can be used to increase the variance of the reward distributions without changing their means. The lower the value, the higher the variance. By default, it is set to 1.
@staticmethod
def
get_unique_symbols() -> List[str]:
Returns
- List[str]: the unique symbols of the grid representation of the MDP.
@staticmethod
def
does_seed_change_MDP_structure() -> bool:
Returns
- bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
- happen when there are fewer starting states that possible one and the effective starting states are picked
- randomly based on the seed.
@staticmethod
def
sample_mdp_parameters(n: int, is_episodic: bool, seed: int = None) -> List[Dict[str, Any]]:
74 @staticmethod 75 def sample_mdp_parameters( 76 n: int, is_episodic: bool, seed: int = None 77 ) -> List[Dict[str, Any]]: 78 rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed) 79 samples = [] 80 for _ in range(n): 81 p_rand, p_lazy, _ = 0.5 * rng.dirichlet([0.2, 0.2, 5]) 82 sample = dict( 83 size=5 84 if is_episodic 85 else rng.choice(range(5, 8), None, True, [0.525, 0.325, 0.15]), 86 p_rand=p_rand * (0.8 if is_episodic else 1), 87 p_lazy=p_lazy * (0.8 if is_episodic else 1), 88 make_reward_stochastic=rng.choice([True, False]), 89 reward_variance_multiplier=2 * rng.random() + 0.005, 90 ) 91 sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"] 92 sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"] 93 94 if sample["make_reward_stochastic"]: 95 sample["default_r"] = ( 96 "beta", 97 ( 98 sample["reward_variance_multiplier"], 99 sample["reward_variance_multiplier"] * (1 / 0.2 - 1), 100 ), 101 ) 102 sample["successfully_delivery_r"] = ( 103 "beta", 104 ( 105 sample["reward_variance_multiplier"], 106 sample["reward_variance_multiplier"] * (1 / 0.9 - 1), 107 ), 108 ) 109 sample["failure_delivery_r"] = ( 110 "beta", 111 ( 112 sample["reward_variance_multiplier"], 113 sample["reward_variance_multiplier"] * (10 / 0.2 - 1), 114 ), 115 ) 116 else: 117 sample["default_r"] = ("deterministic", (0.1,)) 118 sample["successfully_delivery_r"] = ("deterministic", (1.0,)) 119 sample["failure_delivery_r"] = ("deterministic", (0.0,)) 120 121 samples.append(rounding_nested_structure(sample)) 122 return samples
Returns
- List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
@staticmethod
def
get_node_class() -> Type[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]]:
Returns
- Type["NODE_TYPE"]: The class of the nodes of the MDP.
def
get_gin_parameters(self, index: int) -> str:
203 def get_gin_parameters(self, index: int) -> str: 204 prms = dict( 205 size=self._size, 206 make_reward_stochastic=self._make_reward_stochastic, 207 reward_variance_multiplier=self._reward_variance_multiplier, 208 default_r=( 209 self._default_r.dist.name, 210 self._default_r.args, 211 ), 212 successfully_delivery_r=( 213 self._successfully_delivery_r.dist.name, 214 self._successfully_delivery_r.args, 215 ), 216 failure_delivery_r=( 217 self._failure_delivery_r.dist.name, 218 self._failure_delivery_r.args, 219 ), 220 ) 221 if self._p_rand is not None: 222 prms["p_rand"] = self._p_rand 223 224 return TaxiMDP.produce_gin_file_from_mdp_parameters( 225 prms, type(self).__name__, index 226 )
Returns
- str: The gin config of the MDP instance.
Inherited Members
- colosseum.mdp.base.BaseMDP
- get_available_hardness_measures
- produce_gin_file_from_mdp_parameters
- is_episodic
- sample_parameters
- get_grid_representation
- get_gin_config
- get_node_labels
- get_node_action_labels
- hash
- instantiate_MDP
- T
- R
- recurrent_nodes_set
- communication_class
- get_optimal_policy
- get_worst_policy
- get_value_functions
- optimal_value_functions
- worst_value_functions
- random_value_functions
- optimal_transition_probabilities
- worst_transition_probabilities
- random_transition_probabilities
- optimal_markov_chain
- worst_markov_chain
- random_markov_chain
- get_stationary_distribution
- optimal_stationary_distribution
- worst_stationary_distribution
- random_stationary_distribution
- optimal_average_rewards
- worst_average_rewards
- random_average_rewards
- get_average_reward
- optimal_average_reward
- worst_average_reward
- random_average_reward
- transition_matrix_and_rewards
- graph_layout
- graph_metrics
- diameter
- sum_reciprocals_suboptimality_gaps
- discounted_value_norm
- undiscounted_value_norm
- value_norm
- measures_of_hardness
- summary
- hardness_report
- get_info_class
- get_transition_distributions
- get_reward_distribution
- sample_reward
- get_measure_from_name
- action_spec
- observation_spec
- get_observation
- reset
- step
- random_steps
- random_step
- get_visitation_counts
- reset_visitation_counts
- get_value_node_labels
- dm_env._environment.Environment
- reward_spec
- discount_spec
- close