colosseum.mdp.taxi.base

  1import abc
  2from copy import deepcopy
  3from dataclasses import asdict, dataclass
  4from enum import IntEnum
  5from itertools import product
  6from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Type, Union
  7
  8import numpy as np
  9from scipy.stats import beta, rv_continuous
 10
 11from colosseum.mdp import BaseMDP
 12from colosseum.mdp.utils.custom_samplers import NextStateSampler
 13from colosseum.utils.miscellanea import (
 14    check_distributions,
 15    deterministic,
 16    get_dist,
 17    rounding_nested_structure,
 18)
 19
 20if TYPE_CHECKING:
 21    from colosseum.mdp import ACTION_TYPE, NODE_TYPE
 22
 23
 24class TaxiAction(IntEnum):
 25    """
 26    The actions available in the Taxi MDP.
 27    """
 28
 29    MoveSouth = 0
 30    MoveNorth = 1
 31    MoveEast = 2
 32    MoveWest = 3
 33    PickUpPassenger = 4
 34    DropOffPassenger = 5
 35
 36
 37@dataclass(frozen=True)
 38class TaxiNode:
 39    """
 40    The node for the Taxi MDP.
 41    """
 42
 43    X: int
 44    """x coordinate of the taxi."""
 45    Y: int
 46    """y coordinate of the taxi."""
 47    XPass: int
 48    """x coordinate of the passenger, -1 if it is on board."""
 49    YPass: int
 50    """y coordinate of the taxi, -1 if it is on board."""
 51    XDest: int
 52    """x coordinate of the destination."""
 53    YDest: int
 54    """y coordinate of the destination."""
 55
 56    def __str__(self):
 57        return f"X={self.X},Y={self.Y},XPass={self.XPass},YPass={self.YPass},XDest={self.XDest},YDest={self.YDest}"
 58
 59
 60class TaxiMDP(BaseMDP, abc.ABC):
 61    """
 62    The base class for the Taxi family.
 63    """
 64
 65    @staticmethod
 66    def get_unique_symbols() -> List[str]:
 67        return [" ", "A", "X", "D", "P"]
 68
 69    @staticmethod
 70    def does_seed_change_MDP_structure() -> bool:
 71        return True
 72
 73    @staticmethod
 74    def sample_mdp_parameters(
 75        n: int, is_episodic: bool, seed: int = None
 76    ) -> List[Dict[str, Any]]:
 77        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 78        samples = []
 79        for _ in range(n):
 80            p_rand, p_lazy, _ = 0.5 * rng.dirichlet([0.2, 0.2, 5])
 81            sample = dict(
 82                size=5
 83                if is_episodic
 84                else rng.choice(range(5, 8), None, True, [0.525, 0.325, 0.15]),
 85                p_rand=p_rand * (0.8 if is_episodic else 1),
 86                p_lazy=p_lazy * (0.8 if is_episodic else 1),
 87                make_reward_stochastic=rng.choice([True, False]),
 88                reward_variance_multiplier=2 * rng.random() + 0.005,
 89            )
 90            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
 91            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
 92
 93            if sample["make_reward_stochastic"]:
 94                sample["default_r"] = (
 95                    "beta",
 96                    (
 97                        sample["reward_variance_multiplier"],
 98                        sample["reward_variance_multiplier"] * (1 / 0.2 - 1),
 99                    ),
100                )
101                sample["successfully_delivery_r"] = (
102                    "beta",
103                    (
104                        sample["reward_variance_multiplier"],
105                        sample["reward_variance_multiplier"] * (1 / 0.9 - 1),
106                    ),
107                )
108                sample["failure_delivery_r"] = (
109                    "beta",
110                    (
111                        sample["reward_variance_multiplier"],
112                        sample["reward_variance_multiplier"] * (10 / 0.2 - 1),
113                    ),
114                )
115            else:
116                sample["default_r"] = ("deterministic", (0.1,))
117                sample["successfully_delivery_r"] = ("deterministic", (1.0,))
118                sample["failure_delivery_r"] = ("deterministic", (0.0,))
119
120            samples.append(rounding_nested_structure(sample))
121        return samples
122
123    @property
124    def _quadrant_width(self):
125        return self._size / int(self._n_locations ** 0.5) / 2
126
127    @property
128    def _admissible_coordinate(self):
129        rows = []
130        j = 0
131        while len(rows) < self._size:
132            if j % 2 != 0:
133                row = []
134            else:
135                row = [0] * int((self._width + self._space) // 2)
136            i = 0
137            while len(row) < self._size:
138                row.append(int(i % (1 + self._space) == 0))
139                if row[-1] == 1:
140                    for _ in range(self._width - 1):
141                        if len(row) == self._size:
142                            break
143                        row.append(1)
144                i += 1
145            for _ in range(self._length):
146                if len(rows) == self._size:
147                    break
148                rows.append(row)
149            if len(rows) < self._size:
150                rows.append([0] * self._size)
151            j += 1
152        return np.vstack(np.where(np.array(rows) == 0)).T.tolist()
153
154    @property
155    def _quadrants(self):
156        quadrants = np.zeros((self._size, self._size))
157        split = np.array_split(range(self._size), int(self._n_locations ** 0.5))
158        for i, (x, y) in enumerate(product(split, split)):
159            for q_coo_x, q_coo_y in product(x, y):
160                quadrants[q_coo_x, q_coo_y] = i
161        quadrants = [
162            list(
163                filter(
164                    lambda x: x in self._admissible_coordinate,
165                    np.vstack(np.where(quadrants == i)).T.tolist(),
166                )
167            )
168            for i in range(self._n_locations)
169        ]
170
171        assert all(len(q) != 0 for q in quadrants)
172        return quadrants
173
174    @property
175    def locations(self):
176        if len(self._locations) == 0:
177            re_sample = True
178            min_distance = max(self._quadrant_width, 2)
179            while re_sample:
180                locations = [
181                    self._quadrants[i][self._rng.randint(len(self._quadrants[i]))]
182                    for i in range(self._n_locations)
183                ]
184                re_sample = False
185                nplocations = np.array(locations)
186                for i in range(self._n_locations):
187                    for j in range(1 + i, self._n_locations):
188                        diff = np.sqrt(((nplocations[i] - nplocations[j]) ** 2).sum())
189                        if diff <= min_distance:
190                            re_sample = True
191                            break
192                    if re_sample:
193                        break
194            self._rng.shuffle(locations)
195            self._locations = locations[: self.n_locations]
196        return self._locations
197
198    @staticmethod
199    def get_node_class() -> Type["NODE_TYPE"]:
200        return TaxiNode
201
202    def get_gin_parameters(self, index: int) -> str:
203        prms = dict(
204            size=self._size,
205            make_reward_stochastic=self._make_reward_stochastic,
206            reward_variance_multiplier=self._reward_variance_multiplier,
207            default_r=(
208                self._default_r.dist.name,
209                self._default_r.args,
210            ),
211            successfully_delivery_r=(
212                self._successfully_delivery_r.dist.name,
213                self._successfully_delivery_r.args,
214            ),
215            failure_delivery_r=(
216                self._failure_delivery_r.dist.name,
217                self._failure_delivery_r.args,
218            ),
219        )
220        if self._p_rand is not None:
221            prms["p_rand"] = self._p_rand
222
223        return TaxiMDP.produce_gin_file_from_mdp_parameters(
224            prms, type(self).__name__, index
225        )
226
227    @property
228    def n_actions(self) -> int:
229        return len(TaxiAction)
230
231    def _get_next_nodes_parameters(
232        self, node: "NODE_TYPE", action: "ACTION_TYPE"
233    ) -> Tuple[Tuple[dict, float], ...]:
234        next_node_prms = asdict(node)
235
236        if action == TaxiAction.DropOffPassenger:
237            # we have the passenger and we are dropping time(er/im) in the right place
238            if node.XPass == -1 and node.X == node.XDest and node.Y == node.YDest:
239                next_nodes_prms = []
240
241                n = 0
242                for pass_loc in filter(
243                    lambda loc: loc != [node.X, node.Y],
244                    self.locations,
245                ):
246                    n += len(list(filter(lambda loc: loc != pass_loc, self.locations)))
247                p = 1.0 / n
248
249                for pass_loc in filter(
250                    lambda loc: loc != [node.X, node.Y],
251                    self.locations,
252                ):
253                    admissible_destinations = list(
254                        filter(lambda loc: loc != pass_loc, self.locations)
255                    )
256
257                    for destination in admissible_destinations:
258                        cur_next_node_prms: dict = deepcopy(next_node_prms)
259                        (
260                            cur_next_node_prms["XPass"],
261                            cur_next_node_prms["YPass"],
262                        ) = pass_loc
263                        (
264                            cur_next_node_prms["XDest"],
265                            cur_next_node_prms["YDest"],
266                        ) = destination
267                        next_nodes_prms.append((cur_next_node_prms, p))
268                return tuple(next_nodes_prms)
269
270        if action == TaxiAction.PickUpPassenger:
271            if node.XPass != -1 and node.X == node.XPass and node.Y == node.YPass:
272                next_node_prms["XPass"] = -1
273                next_node_prms["YPass"] = -1
274
275        if action == TaxiAction.MoveNorth:
276            next_coord = [node.X, node.Y + 1]
277        elif action == TaxiAction.MoveEast:
278            next_coord = [node.X + 1, node.Y]
279        elif action == TaxiAction.MoveSouth:
280            next_coord = [node.X, node.Y - 1]
281        elif action == TaxiAction.MoveWest:
282            next_coord = [node.X - 1, node.Y]
283        else:
284            next_coord = [node.X, node.Y]
285        if next_coord in self._admissible_coordinate:
286            next_node_prms["X"] = next_coord[0]
287            next_node_prms["Y"] = next_coord[1]
288
289        return ((next_node_prms, 1.0),)
290
291    def _get_reward_distribution(
292        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
293    ) -> rv_continuous:
294        if action == TaxiAction.PickUpPassenger:
295            if next_node.XPass != -1 or node.XPass == -1:
296                # We don't have the passenger
297                return self._failure_delivery_r
298        if action == TaxiAction.DropOffPassenger:
299            if next_node.XPass == -1 or node.XPass != -1:
300                # We didn't drop the passenger in the destination
301                return self._failure_delivery_r
302            elif node.XPass == -1 and next_node.XPass != -1:
303                return self._successfully_delivery_r
304        return self._default_r
305
306    def _get_starting_node_sampler(self) -> NextStateSampler:
307        starting_nodes = []
308        for (
309            (pass_loc_x, pass_loc_y),
310            (destination_x, destination_y),
311            (taxi_x, taxi_y),
312        ) in product(self.locations, self.locations, self._admissible_coordinate):
313            if (pass_loc_x, pass_loc_y) == (destination_x, destination_y):
314                continue
315
316            starting_nodes.append(
317                TaxiNode(
318                    taxi_x, taxi_y, pass_loc_x, pass_loc_y, destination_x, destination_y
319                )
320            )
321        self._rng.shuffle(starting_nodes)
322
323        return NextStateSampler(
324            next_nodes=starting_nodes,
325            probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))],
326            seed=self._produce_random_seed(),
327        )
328
329    def _check_parameters_in_input(self):
330        super(TaxiMDP, self)._check_parameters_in_input()
331
332        assert (
333            self._failure_delivery_r.mean()
334            < self._default_r.mean()
335            < self._successfully_delivery_r.mean()
336        )
337        assert self._size > 3
338        assert self.n_locations > (1 if self.is_episodic() else 2)
339        assert self._size > self._length
340        assert self._size > self._width
341        assert self._size > self._space / 2
342        assert self._size > 2 * self.n_locations ** 0.5
343        assert self._optimal_mean_reward - 0.1 > self._sub_optimal_mean_reward
344
345        dists = [
346            self._default_r,
347            self._failure_delivery_r,
348            self._successfully_delivery_r,
349        ]
350        check_distributions(
351            dists,
352            self._make_reward_stochastic,
353        )
354
355    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
356        grid = np.zeros((self._size, self._size), dtype=str)
357        grid[:, :] = "X"
358        for coo_x, coo_y in self._admissible_coordinate:
359            grid[coo_x, coo_y] = " "
360
361        grid[node.XDest, node.YDest] = "D"
362        if node.XPass != -1:
363            grid[node.XPass, node.YPass] = "P"
364        grid[node.X, node.Y] = "A"
365        return grid[::-1, :]
366
367    @property
368    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
369        return self._starting_node_sampler.next_states
370
371    @property
372    def parameters(self) -> Dict[str, Any]:
373        return {
374            **super(TaxiMDP, self).parameters,
375            **dict(
376                size=self._size,
377                length=self._length,
378                width=self._width,
379                space=self._space,
380                n_locations=self._n_locations,
381                optimal_mean_reward=self._optimal_mean_reward,
382                sub_optimal_mean_reward=self._sub_optimal_mean_reward,
383                default_r=self._default_r,
384                successfully_delivery_r=self._successfully_delivery_r,
385                failure_delivery_r=self._failure_delivery_r,
386            ),
387        }
388
389    def __init__(
390        self,
391        seed: int,
392        size: int,
393        length=2,
394        width=1,
395        space=1,
396        n_locations=2 ** 2,
397        optimal_mean_reward: float = 0.9,
398        sub_optimal_mean_reward: float = 0.2,
399        default_r: Union[Tuple, rv_continuous] = None,
400        successfully_delivery_r: Union[Tuple, rv_continuous] = None,
401        failure_delivery_r: Union[Tuple, rv_continuous] = None,
402        make_reward_stochastic=False,
403        reward_variance_multiplier: float = 1.0,
404        **kwargs,
405    ):
406        """
407        Parameters
408        ----------
409        seed : int
410            The seed used for sampling rewards and next states.
411        size : int
412            The size of the grid.
413        length : int
414            The length of the walls.
415        width : int
416            The width of the walls.
417        space : int
418            The space between walls.
419        n_locations : int
420            The number of possible spawn locations. It must be a squared number.
421        optimal_mean_reward : float
422            If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory.
423            By default, it is set to 0.9.
424        sub_optimal_mean_reward: float
425            If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories.
426            By default, it is set to 0.1.
427        default_r
428        successfully_delivery_r : Union[Tuple, rv_continuous]
429            The reward distribution for successfully delivering a passenger. It can be either passed as a tuple
430            containing Beta parameters or as a rv_continuous object.
431        failure_delivery_r
432            The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing
433            Beta parameters or as a rv_continuous object.
434        make_reward_stochastic : bool
435            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
436        reward_variance_multiplier : float
437            A constant that can be used to increase the variance of the reward distributions without changing their means.
438            The lower the value, the higher the variance. By default, it is set to 1.
439        """
440
441        if type(successfully_delivery_r) == tuple:
442            successfully_delivery_r = get_dist(
443                successfully_delivery_r[0], successfully_delivery_r[1]
444            )
445        if type(failure_delivery_r) == tuple:
446            failure_delivery_r = get_dist(failure_delivery_r[0], failure_delivery_r[1])
447
448        if type(default_r) == tuple:
449            default_r = get_dist(default_r[0], default_r[1])
450
451        self._size = size
452        self._length = length
453        self._width = width
454        self._space = space
455        self.n_locations = n_locations
456        self._n_locations = int(np.ceil(n_locations ** 0.5) ** 2)
457        self._optimal_mean_reward = optimal_mean_reward
458        self._sub_optimal_mean_reward = sub_optimal_mean_reward
459        self._locations = []
460
461        dists = [default_r, successfully_delivery_r, failure_delivery_r]
462        if dists.count(None) == 0:
463            self._default_r = default_r
464            self._successfully_delivery_r = successfully_delivery_r
465            self._failure_delivery_r = failure_delivery_r
466        else:
467            if make_reward_stochastic:
468                self._default_r = beta(
469                    reward_variance_multiplier,
470                    reward_variance_multiplier * (1 / sub_optimal_mean_reward - 1),
471                )
472                self._successfully_delivery_r = beta(
473                    reward_variance_multiplier,
474                    reward_variance_multiplier * (1 / optimal_mean_reward - 1),
475                )
476                self._failure_delivery_r = beta(
477                    reward_variance_multiplier,
478                    reward_variance_multiplier * (10 / sub_optimal_mean_reward - 1),
479                )
480            else:
481                self._default_r = deterministic(0.1)
482                self._successfully_delivery_r = deterministic(1)
483                self._failure_delivery_r = deterministic(0)
484
485        kwargs[
486            "randomize_actions"
487        ] = False  # TODO : double check whether this is actually necessary or not
488
489        super(TaxiMDP, self).__init__(
490            seed=seed,
491            reward_variance_multiplier=reward_variance_multiplier,
492            make_reward_stochastic=make_reward_stochastic,
493            **kwargs,
494        )
class TaxiAction(enum.IntEnum):
25class TaxiAction(IntEnum):
26    """
27    The actions available in the Taxi MDP.
28    """
29
30    MoveSouth = 0
31    MoveNorth = 1
32    MoveEast = 2
33    MoveWest = 3
34    PickUpPassenger = 4
35    DropOffPassenger = 5

The actions available in the Taxi MDP.

Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
@dataclass(frozen=True)
class TaxiNode:
38@dataclass(frozen=True)
39class TaxiNode:
40    """
41    The node for the Taxi MDP.
42    """
43
44    X: int
45    """x coordinate of the taxi."""
46    Y: int
47    """y coordinate of the taxi."""
48    XPass: int
49    """x coordinate of the passenger, -1 if it is on board."""
50    YPass: int
51    """y coordinate of the taxi, -1 if it is on board."""
52    XDest: int
53    """x coordinate of the destination."""
54    YDest: int
55    """y coordinate of the destination."""
56
57    def __str__(self):
58        return f"X={self.X},Y={self.Y},XPass={self.XPass},YPass={self.YPass},XDest={self.XDest},YDest={self.YDest}"

The node for the Taxi MDP.

TaxiNode(X: int, Y: int, XPass: int, YPass: int, XDest: int, YDest: int)
X: int

x coordinate of the taxi.

Y: int

y coordinate of the taxi.

XPass: int

x coordinate of the passenger, -1 if it is on board.

YPass: int

y coordinate of the taxi, -1 if it is on board.

XDest: int

x coordinate of the destination.

YDest: int

y coordinate of the destination.

class TaxiMDP(colosseum.mdp.base.BaseMDP, abc.ABC):
 61class TaxiMDP(BaseMDP, abc.ABC):
 62    """
 63    The base class for the Taxi family.
 64    """
 65
 66    @staticmethod
 67    def get_unique_symbols() -> List[str]:
 68        return [" ", "A", "X", "D", "P"]
 69
 70    @staticmethod
 71    def does_seed_change_MDP_structure() -> bool:
 72        return True
 73
 74    @staticmethod
 75    def sample_mdp_parameters(
 76        n: int, is_episodic: bool, seed: int = None
 77    ) -> List[Dict[str, Any]]:
 78        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 79        samples = []
 80        for _ in range(n):
 81            p_rand, p_lazy, _ = 0.5 * rng.dirichlet([0.2, 0.2, 5])
 82            sample = dict(
 83                size=5
 84                if is_episodic
 85                else rng.choice(range(5, 8), None, True, [0.525, 0.325, 0.15]),
 86                p_rand=p_rand * (0.8 if is_episodic else 1),
 87                p_lazy=p_lazy * (0.8 if is_episodic else 1),
 88                make_reward_stochastic=rng.choice([True, False]),
 89                reward_variance_multiplier=2 * rng.random() + 0.005,
 90            )
 91            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
 92            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
 93
 94            if sample["make_reward_stochastic"]:
 95                sample["default_r"] = (
 96                    "beta",
 97                    (
 98                        sample["reward_variance_multiplier"],
 99                        sample["reward_variance_multiplier"] * (1 / 0.2 - 1),
100                    ),
101                )
102                sample["successfully_delivery_r"] = (
103                    "beta",
104                    (
105                        sample["reward_variance_multiplier"],
106                        sample["reward_variance_multiplier"] * (1 / 0.9 - 1),
107                    ),
108                )
109                sample["failure_delivery_r"] = (
110                    "beta",
111                    (
112                        sample["reward_variance_multiplier"],
113                        sample["reward_variance_multiplier"] * (10 / 0.2 - 1),
114                    ),
115                )
116            else:
117                sample["default_r"] = ("deterministic", (0.1,))
118                sample["successfully_delivery_r"] = ("deterministic", (1.0,))
119                sample["failure_delivery_r"] = ("deterministic", (0.0,))
120
121            samples.append(rounding_nested_structure(sample))
122        return samples
123
124    @property
125    def _quadrant_width(self):
126        return self._size / int(self._n_locations ** 0.5) / 2
127
128    @property
129    def _admissible_coordinate(self):
130        rows = []
131        j = 0
132        while len(rows) < self._size:
133            if j % 2 != 0:
134                row = []
135            else:
136                row = [0] * int((self._width + self._space) // 2)
137            i = 0
138            while len(row) < self._size:
139                row.append(int(i % (1 + self._space) == 0))
140                if row[-1] == 1:
141                    for _ in range(self._width - 1):
142                        if len(row) == self._size:
143                            break
144                        row.append(1)
145                i += 1
146            for _ in range(self._length):
147                if len(rows) == self._size:
148                    break
149                rows.append(row)
150            if len(rows) < self._size:
151                rows.append([0] * self._size)
152            j += 1
153        return np.vstack(np.where(np.array(rows) == 0)).T.tolist()
154
155    @property
156    def _quadrants(self):
157        quadrants = np.zeros((self._size, self._size))
158        split = np.array_split(range(self._size), int(self._n_locations ** 0.5))
159        for i, (x, y) in enumerate(product(split, split)):
160            for q_coo_x, q_coo_y in product(x, y):
161                quadrants[q_coo_x, q_coo_y] = i
162        quadrants = [
163            list(
164                filter(
165                    lambda x: x in self._admissible_coordinate,
166                    np.vstack(np.where(quadrants == i)).T.tolist(),
167                )
168            )
169            for i in range(self._n_locations)
170        ]
171
172        assert all(len(q) != 0 for q in quadrants)
173        return quadrants
174
175    @property
176    def locations(self):
177        if len(self._locations) == 0:
178            re_sample = True
179            min_distance = max(self._quadrant_width, 2)
180            while re_sample:
181                locations = [
182                    self._quadrants[i][self._rng.randint(len(self._quadrants[i]))]
183                    for i in range(self._n_locations)
184                ]
185                re_sample = False
186                nplocations = np.array(locations)
187                for i in range(self._n_locations):
188                    for j in range(1 + i, self._n_locations):
189                        diff = np.sqrt(((nplocations[i] - nplocations[j]) ** 2).sum())
190                        if diff <= min_distance:
191                            re_sample = True
192                            break
193                    if re_sample:
194                        break
195            self._rng.shuffle(locations)
196            self._locations = locations[: self.n_locations]
197        return self._locations
198
199    @staticmethod
200    def get_node_class() -> Type["NODE_TYPE"]:
201        return TaxiNode
202
203    def get_gin_parameters(self, index: int) -> str:
204        prms = dict(
205            size=self._size,
206            make_reward_stochastic=self._make_reward_stochastic,
207            reward_variance_multiplier=self._reward_variance_multiplier,
208            default_r=(
209                self._default_r.dist.name,
210                self._default_r.args,
211            ),
212            successfully_delivery_r=(
213                self._successfully_delivery_r.dist.name,
214                self._successfully_delivery_r.args,
215            ),
216            failure_delivery_r=(
217                self._failure_delivery_r.dist.name,
218                self._failure_delivery_r.args,
219            ),
220        )
221        if self._p_rand is not None:
222            prms["p_rand"] = self._p_rand
223
224        return TaxiMDP.produce_gin_file_from_mdp_parameters(
225            prms, type(self).__name__, index
226        )
227
228    @property
229    def n_actions(self) -> int:
230        return len(TaxiAction)
231
232    def _get_next_nodes_parameters(
233        self, node: "NODE_TYPE", action: "ACTION_TYPE"
234    ) -> Tuple[Tuple[dict, float], ...]:
235        next_node_prms = asdict(node)
236
237        if action == TaxiAction.DropOffPassenger:
238            # we have the passenger and we are dropping time(er/im) in the right place
239            if node.XPass == -1 and node.X == node.XDest and node.Y == node.YDest:
240                next_nodes_prms = []
241
242                n = 0
243                for pass_loc in filter(
244                    lambda loc: loc != [node.X, node.Y],
245                    self.locations,
246                ):
247                    n += len(list(filter(lambda loc: loc != pass_loc, self.locations)))
248                p = 1.0 / n
249
250                for pass_loc in filter(
251                    lambda loc: loc != [node.X, node.Y],
252                    self.locations,
253                ):
254                    admissible_destinations = list(
255                        filter(lambda loc: loc != pass_loc, self.locations)
256                    )
257
258                    for destination in admissible_destinations:
259                        cur_next_node_prms: dict = deepcopy(next_node_prms)
260                        (
261                            cur_next_node_prms["XPass"],
262                            cur_next_node_prms["YPass"],
263                        ) = pass_loc
264                        (
265                            cur_next_node_prms["XDest"],
266                            cur_next_node_prms["YDest"],
267                        ) = destination
268                        next_nodes_prms.append((cur_next_node_prms, p))
269                return tuple(next_nodes_prms)
270
271        if action == TaxiAction.PickUpPassenger:
272            if node.XPass != -1 and node.X == node.XPass and node.Y == node.YPass:
273                next_node_prms["XPass"] = -1
274                next_node_prms["YPass"] = -1
275
276        if action == TaxiAction.MoveNorth:
277            next_coord = [node.X, node.Y + 1]
278        elif action == TaxiAction.MoveEast:
279            next_coord = [node.X + 1, node.Y]
280        elif action == TaxiAction.MoveSouth:
281            next_coord = [node.X, node.Y - 1]
282        elif action == TaxiAction.MoveWest:
283            next_coord = [node.X - 1, node.Y]
284        else:
285            next_coord = [node.X, node.Y]
286        if next_coord in self._admissible_coordinate:
287            next_node_prms["X"] = next_coord[0]
288            next_node_prms["Y"] = next_coord[1]
289
290        return ((next_node_prms, 1.0),)
291
292    def _get_reward_distribution(
293        self, node: "NODE_TYPE", action: "ACTION_TYPE", next_node: "NODE_TYPE"
294    ) -> rv_continuous:
295        if action == TaxiAction.PickUpPassenger:
296            if next_node.XPass != -1 or node.XPass == -1:
297                # We don't have the passenger
298                return self._failure_delivery_r
299        if action == TaxiAction.DropOffPassenger:
300            if next_node.XPass == -1 or node.XPass != -1:
301                # We didn't drop the passenger in the destination
302                return self._failure_delivery_r
303            elif node.XPass == -1 and next_node.XPass != -1:
304                return self._successfully_delivery_r
305        return self._default_r
306
307    def _get_starting_node_sampler(self) -> NextStateSampler:
308        starting_nodes = []
309        for (
310            (pass_loc_x, pass_loc_y),
311            (destination_x, destination_y),
312            (taxi_x, taxi_y),
313        ) in product(self.locations, self.locations, self._admissible_coordinate):
314            if (pass_loc_x, pass_loc_y) == (destination_x, destination_y):
315                continue
316
317            starting_nodes.append(
318                TaxiNode(
319                    taxi_x, taxi_y, pass_loc_x, pass_loc_y, destination_x, destination_y
320                )
321            )
322        self._rng.shuffle(starting_nodes)
323
324        return NextStateSampler(
325            next_nodes=starting_nodes,
326            probs=[1 / len(starting_nodes) for _ in range(len(starting_nodes))],
327            seed=self._produce_random_seed(),
328        )
329
330    def _check_parameters_in_input(self):
331        super(TaxiMDP, self)._check_parameters_in_input()
332
333        assert (
334            self._failure_delivery_r.mean()
335            < self._default_r.mean()
336            < self._successfully_delivery_r.mean()
337        )
338        assert self._size > 3
339        assert self.n_locations > (1 if self.is_episodic() else 2)
340        assert self._size > self._length
341        assert self._size > self._width
342        assert self._size > self._space / 2
343        assert self._size > 2 * self.n_locations ** 0.5
344        assert self._optimal_mean_reward - 0.1 > self._sub_optimal_mean_reward
345
346        dists = [
347            self._default_r,
348            self._failure_delivery_r,
349            self._successfully_delivery_r,
350        ]
351        check_distributions(
352            dists,
353            self._make_reward_stochastic,
354        )
355
356    def _get_grid_representation(self, node: "NODE_TYPE") -> np.ndarray:
357        grid = np.zeros((self._size, self._size), dtype=str)
358        grid[:, :] = "X"
359        for coo_x, coo_y in self._admissible_coordinate:
360            grid[coo_x, coo_y] = " "
361
362        grid[node.XDest, node.YDest] = "D"
363        if node.XPass != -1:
364            grid[node.XPass, node.YPass] = "P"
365        grid[node.X, node.Y] = "A"
366        return grid[::-1, :]
367
368    @property
369    def _possible_starting_nodes(self) -> List["NODE_TYPE"]:
370        return self._starting_node_sampler.next_states
371
372    @property
373    def parameters(self) -> Dict[str, Any]:
374        return {
375            **super(TaxiMDP, self).parameters,
376            **dict(
377                size=self._size,
378                length=self._length,
379                width=self._width,
380                space=self._space,
381                n_locations=self._n_locations,
382                optimal_mean_reward=self._optimal_mean_reward,
383                sub_optimal_mean_reward=self._sub_optimal_mean_reward,
384                default_r=self._default_r,
385                successfully_delivery_r=self._successfully_delivery_r,
386                failure_delivery_r=self._failure_delivery_r,
387            ),
388        }
389
390    def __init__(
391        self,
392        seed: int,
393        size: int,
394        length=2,
395        width=1,
396        space=1,
397        n_locations=2 ** 2,
398        optimal_mean_reward: float = 0.9,
399        sub_optimal_mean_reward: float = 0.2,
400        default_r: Union[Tuple, rv_continuous] = None,
401        successfully_delivery_r: Union[Tuple, rv_continuous] = None,
402        failure_delivery_r: Union[Tuple, rv_continuous] = None,
403        make_reward_stochastic=False,
404        reward_variance_multiplier: float = 1.0,
405        **kwargs,
406    ):
407        """
408        Parameters
409        ----------
410        seed : int
411            The seed used for sampling rewards and next states.
412        size : int
413            The size of the grid.
414        length : int
415            The length of the walls.
416        width : int
417            The width of the walls.
418        space : int
419            The space between walls.
420        n_locations : int
421            The number of possible spawn locations. It must be a squared number.
422        optimal_mean_reward : float
423            If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory.
424            By default, it is set to 0.9.
425        sub_optimal_mean_reward: float
426            If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories.
427            By default, it is set to 0.1.
428        default_r
429        successfully_delivery_r : Union[Tuple, rv_continuous]
430            The reward distribution for successfully delivering a passenger. It can be either passed as a tuple
431            containing Beta parameters or as a rv_continuous object.
432        failure_delivery_r
433            The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing
434            Beta parameters or as a rv_continuous object.
435        make_reward_stochastic : bool
436            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
437        reward_variance_multiplier : float
438            A constant that can be used to increase the variance of the reward distributions without changing their means.
439            The lower the value, the higher the variance. By default, it is set to 1.
440        """
441
442        if type(successfully_delivery_r) == tuple:
443            successfully_delivery_r = get_dist(
444                successfully_delivery_r[0], successfully_delivery_r[1]
445            )
446        if type(failure_delivery_r) == tuple:
447            failure_delivery_r = get_dist(failure_delivery_r[0], failure_delivery_r[1])
448
449        if type(default_r) == tuple:
450            default_r = get_dist(default_r[0], default_r[1])
451
452        self._size = size
453        self._length = length
454        self._width = width
455        self._space = space
456        self.n_locations = n_locations
457        self._n_locations = int(np.ceil(n_locations ** 0.5) ** 2)
458        self._optimal_mean_reward = optimal_mean_reward
459        self._sub_optimal_mean_reward = sub_optimal_mean_reward
460        self._locations = []
461
462        dists = [default_r, successfully_delivery_r, failure_delivery_r]
463        if dists.count(None) == 0:
464            self._default_r = default_r
465            self._successfully_delivery_r = successfully_delivery_r
466            self._failure_delivery_r = failure_delivery_r
467        else:
468            if make_reward_stochastic:
469                self._default_r = beta(
470                    reward_variance_multiplier,
471                    reward_variance_multiplier * (1 / sub_optimal_mean_reward - 1),
472                )
473                self._successfully_delivery_r = beta(
474                    reward_variance_multiplier,
475                    reward_variance_multiplier * (1 / optimal_mean_reward - 1),
476                )
477                self._failure_delivery_r = beta(
478                    reward_variance_multiplier,
479                    reward_variance_multiplier * (10 / sub_optimal_mean_reward - 1),
480                )
481            else:
482                self._default_r = deterministic(0.1)
483                self._successfully_delivery_r = deterministic(1)
484                self._failure_delivery_r = deterministic(0)
485
486        kwargs[
487            "randomize_actions"
488        ] = False  # TODO : double check whether this is actually necessary or not
489
490        super(TaxiMDP, self).__init__(
491            seed=seed,
492            reward_variance_multiplier=reward_variance_multiplier,
493            make_reward_stochastic=make_reward_stochastic,
494            **kwargs,
495        )

The base class for the Taxi family.

TaxiMDP( seed: int, size: int, length=2, width=1, space=1, n_locations=4, optimal_mean_reward: float = 0.9, sub_optimal_mean_reward: float = 0.2, default_r: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, successfully_delivery_r: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, failure_delivery_r: Union[Tuple, scipy.stats._distn_infrastructure.rv_continuous] = None, make_reward_stochastic=False, reward_variance_multiplier: float = 1.0, **kwargs)
390    def __init__(
391        self,
392        seed: int,
393        size: int,
394        length=2,
395        width=1,
396        space=1,
397        n_locations=2 ** 2,
398        optimal_mean_reward: float = 0.9,
399        sub_optimal_mean_reward: float = 0.2,
400        default_r: Union[Tuple, rv_continuous] = None,
401        successfully_delivery_r: Union[Tuple, rv_continuous] = None,
402        failure_delivery_r: Union[Tuple, rv_continuous] = None,
403        make_reward_stochastic=False,
404        reward_variance_multiplier: float = 1.0,
405        **kwargs,
406    ):
407        """
408        Parameters
409        ----------
410        seed : int
411            The seed used for sampling rewards and next states.
412        size : int
413            The size of the grid.
414        length : int
415            The length of the walls.
416        width : int
417            The width of the walls.
418        space : int
419            The space between walls.
420        n_locations : int
421            The number of possible spawn locations. It must be a squared number.
422        optimal_mean_reward : float
423            If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory.
424            By default, it is set to 0.9.
425        sub_optimal_mean_reward: float
426            If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories.
427            By default, it is set to 0.1.
428        default_r
429        successfully_delivery_r : Union[Tuple, rv_continuous]
430            The reward distribution for successfully delivering a passenger. It can be either passed as a tuple
431            containing Beta parameters or as a rv_continuous object.
432        failure_delivery_r
433            The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing
434            Beta parameters or as a rv_continuous object.
435        make_reward_stochastic : bool
436            If True, the rewards of the MDP will be stochastic. By default, it is set to False.
437        reward_variance_multiplier : float
438            A constant that can be used to increase the variance of the reward distributions without changing their means.
439            The lower the value, the higher the variance. By default, it is set to 1.
440        """
441
442        if type(successfully_delivery_r) == tuple:
443            successfully_delivery_r = get_dist(
444                successfully_delivery_r[0], successfully_delivery_r[1]
445            )
446        if type(failure_delivery_r) == tuple:
447            failure_delivery_r = get_dist(failure_delivery_r[0], failure_delivery_r[1])
448
449        if type(default_r) == tuple:
450            default_r = get_dist(default_r[0], default_r[1])
451
452        self._size = size
453        self._length = length
454        self._width = width
455        self._space = space
456        self.n_locations = n_locations
457        self._n_locations = int(np.ceil(n_locations ** 0.5) ** 2)
458        self._optimal_mean_reward = optimal_mean_reward
459        self._sub_optimal_mean_reward = sub_optimal_mean_reward
460        self._locations = []
461
462        dists = [default_r, successfully_delivery_r, failure_delivery_r]
463        if dists.count(None) == 0:
464            self._default_r = default_r
465            self._successfully_delivery_r = successfully_delivery_r
466            self._failure_delivery_r = failure_delivery_r
467        else:
468            if make_reward_stochastic:
469                self._default_r = beta(
470                    reward_variance_multiplier,
471                    reward_variance_multiplier * (1 / sub_optimal_mean_reward - 1),
472                )
473                self._successfully_delivery_r = beta(
474                    reward_variance_multiplier,
475                    reward_variance_multiplier * (1 / optimal_mean_reward - 1),
476                )
477                self._failure_delivery_r = beta(
478                    reward_variance_multiplier,
479                    reward_variance_multiplier * (10 / sub_optimal_mean_reward - 1),
480                )
481            else:
482                self._default_r = deterministic(0.1)
483                self._successfully_delivery_r = deterministic(1)
484                self._failure_delivery_r = deterministic(0)
485
486        kwargs[
487            "randomize_actions"
488        ] = False  # TODO : double check whether this is actually necessary or not
489
490        super(TaxiMDP, self).__init__(
491            seed=seed,
492            reward_variance_multiplier=reward_variance_multiplier,
493            make_reward_stochastic=make_reward_stochastic,
494            **kwargs,
495        )
Parameters
  • seed (int): The seed used for sampling rewards and next states.
  • size (int): The size of the grid.
  • length (int): The length of the walls.
  • width (int): The width of the walls.
  • space (int): The space between walls.
  • n_locations (int): The number of possible spawn locations. It must be a squared number.
  • optimal_mean_reward (float): If the rewards are made stochastic, this parameter controls the mean reward for the optimal trajectory. By default, it is set to 0.9.
  • sub_optimal_mean_reward (float): If the rewards are made stochastic, this parameter controls the mean reward for suboptimal trajectories. By default, it is set to 0.1.
  • default_r
  • successfully_delivery_r (Union[Tuple, rv_continuous]): The reward distribution for successfully delivering a passenger. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
  • failure_delivery_r: The reward distribution for failing to deliver a passenger. It can be either passed as a tuple containing Beta parameters or as a rv_continuous object.
  • make_reward_stochastic (bool): If True, the rewards of the MDP will be stochastic. By default, it is set to False.
  • reward_variance_multiplier (float): A constant that can be used to increase the variance of the reward distributions without changing their means. The lower the value, the higher the variance. By default, it is set to 1.
@staticmethod
def get_unique_symbols() -> List[str]:
66    @staticmethod
67    def get_unique_symbols() -> List[str]:
68        return [" ", "A", "X", "D", "P"]
Returns
  • List[str]: the unique symbols of the grid representation of the MDP.
@staticmethod
def does_seed_change_MDP_structure() -> bool:
70    @staticmethod
71    def does_seed_change_MDP_structure() -> bool:
72        return True
Returns
  • bool: True if when changing the seed the transition matrix and/or rewards matrix change. This for example may
  • happen when there are fewer starting states that possible one and the effective starting states are picked
  • randomly based on the seed.
@staticmethod
def sample_mdp_parameters(n: int, is_episodic: bool, seed: int = None) -> List[Dict[str, Any]]:
 74    @staticmethod
 75    def sample_mdp_parameters(
 76        n: int, is_episodic: bool, seed: int = None
 77    ) -> List[Dict[str, Any]]:
 78        rng = np.random.RandomState(np.random.randint(10_000) if seed is None else seed)
 79        samples = []
 80        for _ in range(n):
 81            p_rand, p_lazy, _ = 0.5 * rng.dirichlet([0.2, 0.2, 5])
 82            sample = dict(
 83                size=5
 84                if is_episodic
 85                else rng.choice(range(5, 8), None, True, [0.525, 0.325, 0.15]),
 86                p_rand=p_rand * (0.8 if is_episodic else 1),
 87                p_lazy=p_lazy * (0.8 if is_episodic else 1),
 88                make_reward_stochastic=rng.choice([True, False]),
 89                reward_variance_multiplier=2 * rng.random() + 0.005,
 90            )
 91            sample["p_rand"] = None if sample["p_rand"] < 0.01 else sample["p_rand"]
 92            sample["p_lazy"] = None if sample["p_lazy"] < 0.01 else sample["p_lazy"]
 93
 94            if sample["make_reward_stochastic"]:
 95                sample["default_r"] = (
 96                    "beta",
 97                    (
 98                        sample["reward_variance_multiplier"],
 99                        sample["reward_variance_multiplier"] * (1 / 0.2 - 1),
100                    ),
101                )
102                sample["successfully_delivery_r"] = (
103                    "beta",
104                    (
105                        sample["reward_variance_multiplier"],
106                        sample["reward_variance_multiplier"] * (1 / 0.9 - 1),
107                    ),
108                )
109                sample["failure_delivery_r"] = (
110                    "beta",
111                    (
112                        sample["reward_variance_multiplier"],
113                        sample["reward_variance_multiplier"] * (10 / 0.2 - 1),
114                    ),
115                )
116            else:
117                sample["default_r"] = ("deterministic", (0.1,))
118                sample["successfully_delivery_r"] = ("deterministic", (1.0,))
119                sample["failure_delivery_r"] = ("deterministic", (0.0,))
120
121            samples.append(rounding_nested_structure(sample))
122        return samples
Returns
  • List[Dict[str, Any]]: n sampled parameters that can be used to construct an MDP in a reasonable amount of time.
def get_gin_parameters(self, index: int) -> str:
203    def get_gin_parameters(self, index: int) -> str:
204        prms = dict(
205            size=self._size,
206            make_reward_stochastic=self._make_reward_stochastic,
207            reward_variance_multiplier=self._reward_variance_multiplier,
208            default_r=(
209                self._default_r.dist.name,
210                self._default_r.args,
211            ),
212            successfully_delivery_r=(
213                self._successfully_delivery_r.dist.name,
214                self._successfully_delivery_r.args,
215            ),
216            failure_delivery_r=(
217                self._failure_delivery_r.dist.name,
218                self._failure_delivery_r.args,
219            ),
220        )
221        if self._p_rand is not None:
222            prms["p_rand"] = self._p_rand
223
224        return TaxiMDP.produce_gin_file_from_mdp_parameters(
225            prms, type(self).__name__, index
226        )
Returns
  • str: The gin config of the MDP instance.
n_actions: int
Returns
  • int: The number of available actions.
parameters: Dict[str, Any]
Returns
  • Dict[str, Any]: The parameters of the MDP.