colosseum.emission_maps.base

  1import abc
  2import importlib
  3import re
  4from typing import TYPE_CHECKING, Dict, Any, Type, Tuple
  5
  6import numpy as np
  7
  8from colosseum import config
  9
 10if TYPE_CHECKING:
 11    from colosseum.mdp import NODE_TYPE
 12    from colosseum.mdp.base import BaseMDP
 13    from colosseum.noises.base import Noise
 14
 15
 16class EmissionMap(abc.ABC):
 17    """
 18    The base class to define emission maps that transform tabular MDPs into non-tabular versions.
 19    """
 20
 21    @property
 22    @abc.abstractmethod
 23    def is_tabular(self) -> bool:
 24        """
 25        Returns
 26        -------
 27        bool
 28            The boolean for whether the emission map is tabular.
 29        """
 30
 31    @abc.abstractmethod
 32    def node_to_observation(
 33        self, node: "NODE_TYPE", in_episode_time: int = None
 34    ) -> np.ndarray:
 35        """
 36        Returns
 37        -------
 38        np.ndarray
 39            The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the
 40            current in-episode time step.
 41        """
 42
 43    @property
 44    def shape(self) -> Tuple[int, ...]:
 45        """
 46        Returns
 47        -------
 48        Tuple[int, ...]
 49            The shape of the non-tabular representation.
 50        """
 51        if self._shape is None:
 52            self._shape = self.node_to_observation(self._mdp.starting_nodes[0], 0).shape
 53        return self._shape
 54
 55    @property
 56    def all_observations(self) -> np.ndarray:
 57        """
 58        Returns
 59        -------
 60        np.ndarray
 61            The numpy array containing all the non-tabular representation for the states in the MDP. In the episodic
 62            case, it is episode length by number of state by number of action. In the continuous case, the dimension for
 63            the episode is dropped.
 64        """
 65        if self._observations is None:
 66            if self._mdp.is_episodic():
 67                self._observations = np.empty(
 68                    (self._mdp.H, self._mdp.n_states, *self.shape), np.float32
 69                )
 70            else:
 71                self._observations = np.empty(
 72                    (self._mdp.n_states, *self.shape), np.float32
 73                )
 74
 75            for i, n in enumerate(self._mdp.G.nodes):
 76                if self._mdp.is_episodic():
 77                    for h in range(self._mdp.H):
 78                        self._observations[h, i] = self.node_to_observation(n, h)
 79                else:
 80                    self._observations[i] = self.node_to_observation(n, None)
 81        return self._observations
 82
 83    def __init__(
 84        self,
 85        mdp: "BaseMDP",
 86        noise_class: Type["Noise"],
 87        noise_kwargs: Dict[str, Any],
 88    ):
 89        """
 90        Parameters
 91        ----------
 92        mdp : BaseMDP
 93            The tabular MDP.
 94        noise_class : Type["Noise"]
 95            The noise that renders the emission map stochastic.
 96        noise_kwargs : Dict[str, Any]
 97            The parameters for the noise class.
 98        """
 99
100        self._mdp = mdp
101        self._cached_obs = dict()
102        self._observations = None
103        self._shape = None
104
105        if noise_class is not None:
106            self._noise_map = noise_class(shape_f=lambda: self.shape, **noise_kwargs)
107        else:
108            self._noise_map = None
109
110    def get_observation(
111        self, state: "NODE_TYPE", in_episode_time: int = None
112    ) -> np.ndarray:
113        """
114        computes the observation numpy array corresponding to the state in input.
115
116        Parameters
117        ----------
118        state : NODE_TYPE
119            The state for which we are computing the observation.
120        in_episode_time : int
121            The in episode time. It is ignored in the continuous setting, and, by default, it is set to None.
122
123        Returns
124        -------
125        np.ndarray
126            The observation.
127        """
128
129        if self._mdp.is_episodic():
130            if in_episode_time is None:
131                in_episode_time = self._mdp.h
132            if in_episode_time >= self._mdp.H:
133                return np.zeros(self.shape, np.float32)
134        if not self._mdp.is_episodic():
135            in_episode_time = None
136        obs = self.all_observations[in_episode_time, self._mdp.node_to_index[state]]
137        if self._noise_map is not None:
138            return obs + next(self._noise_map)
139        return obs
140
141
142class StateLinear(EmissionMap, abc.ABC):
143    """
144    The base class for the emission map such that the non-tabular representation is a vector for which the value function
145    of a given policy is linear.
146    """
147
148    def __init__(
149        self,
150        mdp: "BaseMDP",
151        noise_class: Type["Noise"],
152        noise_kwargs: Dict[str, Any],
153        d: int = None,
154    ):
155        """
156        Parameters
157        ----------
158        mdp : BaseMDP
159            The tabular MDP.
160        noise_class : Type["Noise"]
161            The noise that renders the emission map stochastic.
162        noise_kwargs : Dict[str, Any]
163            The parameters for the noise class.
164        d : int
165            The dimensionality of the non-tabular representation vector.
166        """
167
168        self._features = None
169        self._d = (
170            max(config.get_min_linear_feature_dim(), int(mdp.n_states * 0.1))
171            if d is None
172            else d
173        )
174
175        super(StateLinear, self).__init__(mdp, noise_class, noise_kwargs)
176
177    @property
178    def is_tabular(self) -> bool:
179        return False
180
181    @property
182    @abc.abstractmethod
183    def V(self) -> np.ndarray:
184        """
185        Returns
186        -------
187        np.ndarray
188            The value function w.r.t. which the non-tabular representation is linear.
189        """
190
191    def _sample_features(self):
192        self._features = _sample_linear_value_features(
193            self.V, self._d, self._mdp.H if self._mdp.is_episodic() else None
194        ).astype(np.float32)
195
196    def node_to_observation(
197        self, node: "NODE_TYPE", in_episode_time: int = None
198    ) -> np.ndarray:
199        if self._features is None:
200            self._sample_features()
201        if in_episode_time is not None and self._mdp.is_episodic():
202            return self._features[in_episode_time, self._mdp.node_to_index[node]]
203        return self._features[self._mdp.node_to_index[node]]
204
205
206def get_emission_map_from_name(emission_map_name: str) -> Type[EmissionMap]:
207    """
208    Returns
209    -------
210    EmissionMap
211        The EmissionMap class corresponding to the name of the emission map given in input.
212    """
213    return importlib.import_module(
214        f"colosseum.emission_maps.{re.sub(r'(?<!^)(?=[A-Z])', '_', emission_map_name).lower()}"
215    ).__getattribute__(emission_map_name)
216
217
218def _sample_linear_value_features(v: np.ndarray, d: int, H: int = None):
219    psi = np.random.randn(v.size, d)
220    psi[:, 0] = 1
221    psi[:, 1] = v
222
223    P = psi @ np.linalg.inv(psi.T @ psi) @ psi.T
224
225    W = np.random.randn(v.size, d)
226    W[:, 0] = 1
227
228    W_p = P @ W
229    features = W_p / np.linalg.norm(W_p, axis=0, keepdims=True)
230    if H is not None:
231        features = features.reshape(H + 1, -1, d)
232    return features
233
234
235def _get_symbol_mapping(mdp: "BaseMDP") -> Dict[str, int]:
236    symbols = mdp.get_unique_symbols()
237    return dict(zip(symbols, range(len(symbols))))
class EmissionMap(abc.ABC):
 17class EmissionMap(abc.ABC):
 18    """
 19    The base class to define emission maps that transform tabular MDPs into non-tabular versions.
 20    """
 21
 22    @property
 23    @abc.abstractmethod
 24    def is_tabular(self) -> bool:
 25        """
 26        Returns
 27        -------
 28        bool
 29            The boolean for whether the emission map is tabular.
 30        """
 31
 32    @abc.abstractmethod
 33    def node_to_observation(
 34        self, node: "NODE_TYPE", in_episode_time: int = None
 35    ) -> np.ndarray:
 36        """
 37        Returns
 38        -------
 39        np.ndarray
 40            The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the
 41            current in-episode time step.
 42        """
 43
 44    @property
 45    def shape(self) -> Tuple[int, ...]:
 46        """
 47        Returns
 48        -------
 49        Tuple[int, ...]
 50            The shape of the non-tabular representation.
 51        """
 52        if self._shape is None:
 53            self._shape = self.node_to_observation(self._mdp.starting_nodes[0], 0).shape
 54        return self._shape
 55
 56    @property
 57    def all_observations(self) -> np.ndarray:
 58        """
 59        Returns
 60        -------
 61        np.ndarray
 62            The numpy array containing all the non-tabular representation for the states in the MDP. In the episodic
 63            case, it is episode length by number of state by number of action. In the continuous case, the dimension for
 64            the episode is dropped.
 65        """
 66        if self._observations is None:
 67            if self._mdp.is_episodic():
 68                self._observations = np.empty(
 69                    (self._mdp.H, self._mdp.n_states, *self.shape), np.float32
 70                )
 71            else:
 72                self._observations = np.empty(
 73                    (self._mdp.n_states, *self.shape), np.float32
 74                )
 75
 76            for i, n in enumerate(self._mdp.G.nodes):
 77                if self._mdp.is_episodic():
 78                    for h in range(self._mdp.H):
 79                        self._observations[h, i] = self.node_to_observation(n, h)
 80                else:
 81                    self._observations[i] = self.node_to_observation(n, None)
 82        return self._observations
 83
 84    def __init__(
 85        self,
 86        mdp: "BaseMDP",
 87        noise_class: Type["Noise"],
 88        noise_kwargs: Dict[str, Any],
 89    ):
 90        """
 91        Parameters
 92        ----------
 93        mdp : BaseMDP
 94            The tabular MDP.
 95        noise_class : Type["Noise"]
 96            The noise that renders the emission map stochastic.
 97        noise_kwargs : Dict[str, Any]
 98            The parameters for the noise class.
 99        """
100
101        self._mdp = mdp
102        self._cached_obs = dict()
103        self._observations = None
104        self._shape = None
105
106        if noise_class is not None:
107            self._noise_map = noise_class(shape_f=lambda: self.shape, **noise_kwargs)
108        else:
109            self._noise_map = None
110
111    def get_observation(
112        self, state: "NODE_TYPE", in_episode_time: int = None
113    ) -> np.ndarray:
114        """
115        computes the observation numpy array corresponding to the state in input.
116
117        Parameters
118        ----------
119        state : NODE_TYPE
120            The state for which we are computing the observation.
121        in_episode_time : int
122            The in episode time. It is ignored in the continuous setting, and, by default, it is set to None.
123
124        Returns
125        -------
126        np.ndarray
127            The observation.
128        """
129
130        if self._mdp.is_episodic():
131            if in_episode_time is None:
132                in_episode_time = self._mdp.h
133            if in_episode_time >= self._mdp.H:
134                return np.zeros(self.shape, np.float32)
135        if not self._mdp.is_episodic():
136            in_episode_time = None
137        obs = self.all_observations[in_episode_time, self._mdp.node_to_index[state]]
138        if self._noise_map is not None:
139            return obs + next(self._noise_map)
140        return obs

The base class to define emission maps that transform tabular MDPs into non-tabular versions.

EmissionMap( mdp: colosseum.mdp.base.BaseMDP, noise_class: Type[colosseum.noises.base.Noise], noise_kwargs: Dict[str, Any])
 84    def __init__(
 85        self,
 86        mdp: "BaseMDP",
 87        noise_class: Type["Noise"],
 88        noise_kwargs: Dict[str, Any],
 89    ):
 90        """
 91        Parameters
 92        ----------
 93        mdp : BaseMDP
 94            The tabular MDP.
 95        noise_class : Type["Noise"]
 96            The noise that renders the emission map stochastic.
 97        noise_kwargs : Dict[str, Any]
 98            The parameters for the noise class.
 99        """
100
101        self._mdp = mdp
102        self._cached_obs = dict()
103        self._observations = None
104        self._shape = None
105
106        if noise_class is not None:
107            self._noise_map = noise_class(shape_f=lambda: self.shape, **noise_kwargs)
108        else:
109            self._noise_map = None
Parameters
  • mdp (BaseMDP): The tabular MDP.
  • noise_class (Type["Noise"]): The noise that renders the emission map stochastic.
  • noise_kwargs (Dict[str, Any]): The parameters for the noise class.
is_tabular: bool
Returns
  • bool: The boolean for whether the emission map is tabular.
32    @abc.abstractmethod
33    def node_to_observation(
34        self, node: "NODE_TYPE", in_episode_time: int = None
35    ) -> np.ndarray:
36        """
37        Returns
38        -------
39        np.ndarray
40            The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the
41            current in-episode time step.
42        """
Returns
  • np.ndarray: The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the current in-episode time step.
shape: Tuple[int, ...]
Returns
  • Tuple[int, ...]: The shape of the non-tabular representation.
all_observations: numpy.ndarray
Returns
  • np.ndarray: The numpy array containing all the non-tabular representation for the states in the MDP. In the episodic case, it is episode length by number of state by number of action. In the continuous case, the dimension for the episode is dropped.
111    def get_observation(
112        self, state: "NODE_TYPE", in_episode_time: int = None
113    ) -> np.ndarray:
114        """
115        computes the observation numpy array corresponding to the state in input.
116
117        Parameters
118        ----------
119        state : NODE_TYPE
120            The state for which we are computing the observation.
121        in_episode_time : int
122            The in episode time. It is ignored in the continuous setting, and, by default, it is set to None.
123
124        Returns
125        -------
126        np.ndarray
127            The observation.
128        """
129
130        if self._mdp.is_episodic():
131            if in_episode_time is None:
132                in_episode_time = self._mdp.h
133            if in_episode_time >= self._mdp.H:
134                return np.zeros(self.shape, np.float32)
135        if not self._mdp.is_episodic():
136            in_episode_time = None
137        obs = self.all_observations[in_episode_time, self._mdp.node_to_index[state]]
138        if self._noise_map is not None:
139            return obs + next(self._noise_map)
140        return obs

computes the observation numpy array corresponding to the state in input.

Parameters
  • state (NODE_TYPE): The state for which we are computing the observation.
  • in_episode_time (int): The in episode time. It is ignored in the continuous setting, and, by default, it is set to None.
Returns
  • np.ndarray: The observation.
class StateLinear(EmissionMap, abc.ABC):
143class StateLinear(EmissionMap, abc.ABC):
144    """
145    The base class for the emission map such that the non-tabular representation is a vector for which the value function
146    of a given policy is linear.
147    """
148
149    def __init__(
150        self,
151        mdp: "BaseMDP",
152        noise_class: Type["Noise"],
153        noise_kwargs: Dict[str, Any],
154        d: int = None,
155    ):
156        """
157        Parameters
158        ----------
159        mdp : BaseMDP
160            The tabular MDP.
161        noise_class : Type["Noise"]
162            The noise that renders the emission map stochastic.
163        noise_kwargs : Dict[str, Any]
164            The parameters for the noise class.
165        d : int
166            The dimensionality of the non-tabular representation vector.
167        """
168
169        self._features = None
170        self._d = (
171            max(config.get_min_linear_feature_dim(), int(mdp.n_states * 0.1))
172            if d is None
173            else d
174        )
175
176        super(StateLinear, self).__init__(mdp, noise_class, noise_kwargs)
177
178    @property
179    def is_tabular(self) -> bool:
180        return False
181
182    @property
183    @abc.abstractmethod
184    def V(self) -> np.ndarray:
185        """
186        Returns
187        -------
188        np.ndarray
189            The value function w.r.t. which the non-tabular representation is linear.
190        """
191
192    def _sample_features(self):
193        self._features = _sample_linear_value_features(
194            self.V, self._d, self._mdp.H if self._mdp.is_episodic() else None
195        ).astype(np.float32)
196
197    def node_to_observation(
198        self, node: "NODE_TYPE", in_episode_time: int = None
199    ) -> np.ndarray:
200        if self._features is None:
201            self._sample_features()
202        if in_episode_time is not None and self._mdp.is_episodic():
203            return self._features[in_episode_time, self._mdp.node_to_index[node]]
204        return self._features[self._mdp.node_to_index[node]]

The base class for the emission map such that the non-tabular representation is a vector for which the value function of a given policy is linear.

StateLinear( mdp: colosseum.mdp.base.BaseMDP, noise_class: Type[colosseum.noises.base.Noise], noise_kwargs: Dict[str, Any], d: int = None)
149    def __init__(
150        self,
151        mdp: "BaseMDP",
152        noise_class: Type["Noise"],
153        noise_kwargs: Dict[str, Any],
154        d: int = None,
155    ):
156        """
157        Parameters
158        ----------
159        mdp : BaseMDP
160            The tabular MDP.
161        noise_class : Type["Noise"]
162            The noise that renders the emission map stochastic.
163        noise_kwargs : Dict[str, Any]
164            The parameters for the noise class.
165        d : int
166            The dimensionality of the non-tabular representation vector.
167        """
168
169        self._features = None
170        self._d = (
171            max(config.get_min_linear_feature_dim(), int(mdp.n_states * 0.1))
172            if d is None
173            else d
174        )
175
176        super(StateLinear, self).__init__(mdp, noise_class, noise_kwargs)
Parameters
  • mdp (BaseMDP): The tabular MDP.
  • noise_class (Type["Noise"]): The noise that renders the emission map stochastic.
  • noise_kwargs (Dict[str, Any]): The parameters for the noise class.
  • d (int): The dimensionality of the non-tabular representation vector.
is_tabular: bool
Returns
  • bool: The boolean for whether the emission map is tabular.
V: numpy.ndarray
Returns
  • np.ndarray: The value function w.r.t. which the non-tabular representation is linear.
197    def node_to_observation(
198        self, node: "NODE_TYPE", in_episode_time: int = None
199    ) -> np.ndarray:
200        if self._features is None:
201            self._sample_features()
202        if in_episode_time is not None and self._mdp.is_episodic():
203            return self._features[in_episode_time, self._mdp.node_to_index[node]]
204        return self._features[self._mdp.node_to_index[node]]
Returns
  • np.ndarray: The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the current in-episode time step.
def get_emission_map_from_name(emission_map_name: str) -> Type[colosseum.emission_maps.base.EmissionMap]:
207def get_emission_map_from_name(emission_map_name: str) -> Type[EmissionMap]:
208    """
209    Returns
210    -------
211    EmissionMap
212        The EmissionMap class corresponding to the name of the emission map given in input.
213    """
214    return importlib.import_module(
215        f"colosseum.emission_maps.{re.sub(r'(?<!^)(?=[A-Z])', '_', emission_map_name).lower()}"
216    ).__getattribute__(emission_map_name)
Returns
  • EmissionMap: The EmissionMap class corresponding to the name of the emission map given in input.