colosseum.emission_maps.base
1import abc 2import importlib 3import re 4from typing import TYPE_CHECKING, Dict, Any, Type, Tuple 5 6import numpy as np 7 8from colosseum import config 9 10if TYPE_CHECKING: 11 from colosseum.mdp import NODE_TYPE 12 from colosseum.mdp.base import BaseMDP 13 from colosseum.noises.base import Noise 14 15 16class EmissionMap(abc.ABC): 17 """ 18 The base class to define emission maps that transform tabular MDPs into non-tabular versions. 19 """ 20 21 @property 22 @abc.abstractmethod 23 def is_tabular(self) -> bool: 24 """ 25 Returns 26 ------- 27 bool 28 The boolean for whether the emission map is tabular. 29 """ 30 31 @abc.abstractmethod 32 def node_to_observation( 33 self, node: "NODE_TYPE", in_episode_time: int = None 34 ) -> np.ndarray: 35 """ 36 Returns 37 ------- 38 np.ndarray 39 The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the 40 current in-episode time step. 41 """ 42 43 @property 44 def shape(self) -> Tuple[int, ...]: 45 """ 46 Returns 47 ------- 48 Tuple[int, ...] 49 The shape of the non-tabular representation. 50 """ 51 if self._shape is None: 52 self._shape = self.node_to_observation(self._mdp.starting_nodes[0], 0).shape 53 return self._shape 54 55 @property 56 def all_observations(self) -> np.ndarray: 57 """ 58 Returns 59 ------- 60 np.ndarray 61 The numpy array containing all the non-tabular representation for the states in the MDP. In the episodic 62 case, it is episode length by number of state by number of action. In the continuous case, the dimension for 63 the episode is dropped. 64 """ 65 if self._observations is None: 66 if self._mdp.is_episodic(): 67 self._observations = np.empty( 68 (self._mdp.H, self._mdp.n_states, *self.shape), np.float32 69 ) 70 else: 71 self._observations = np.empty( 72 (self._mdp.n_states, *self.shape), np.float32 73 ) 74 75 for i, n in enumerate(self._mdp.G.nodes): 76 if self._mdp.is_episodic(): 77 for h in range(self._mdp.H): 78 self._observations[h, i] = self.node_to_observation(n, h) 79 else: 80 self._observations[i] = self.node_to_observation(n, None) 81 return self._observations 82 83 def __init__( 84 self, 85 mdp: "BaseMDP", 86 noise_class: Type["Noise"], 87 noise_kwargs: Dict[str, Any], 88 ): 89 """ 90 Parameters 91 ---------- 92 mdp : BaseMDP 93 The tabular MDP. 94 noise_class : Type["Noise"] 95 The noise that renders the emission map stochastic. 96 noise_kwargs : Dict[str, Any] 97 The parameters for the noise class. 98 """ 99 100 self._mdp = mdp 101 self._cached_obs = dict() 102 self._observations = None 103 self._shape = None 104 105 if noise_class is not None: 106 self._noise_map = noise_class(shape_f=lambda: self.shape, **noise_kwargs) 107 else: 108 self._noise_map = None 109 110 def get_observation( 111 self, state: "NODE_TYPE", in_episode_time: int = None 112 ) -> np.ndarray: 113 """ 114 computes the observation numpy array corresponding to the state in input. 115 116 Parameters 117 ---------- 118 state : NODE_TYPE 119 The state for which we are computing the observation. 120 in_episode_time : int 121 The in episode time. It is ignored in the continuous setting, and, by default, it is set to None. 122 123 Returns 124 ------- 125 np.ndarray 126 The observation. 127 """ 128 129 if self._mdp.is_episodic(): 130 if in_episode_time is None: 131 in_episode_time = self._mdp.h 132 if in_episode_time >= self._mdp.H: 133 return np.zeros(self.shape, np.float32) 134 if not self._mdp.is_episodic(): 135 in_episode_time = None 136 obs = self.all_observations[in_episode_time, self._mdp.node_to_index[state]] 137 if self._noise_map is not None: 138 return obs + next(self._noise_map) 139 return obs 140 141 142class StateLinear(EmissionMap, abc.ABC): 143 """ 144 The base class for the emission map such that the non-tabular representation is a vector for which the value function 145 of a given policy is linear. 146 """ 147 148 def __init__( 149 self, 150 mdp: "BaseMDP", 151 noise_class: Type["Noise"], 152 noise_kwargs: Dict[str, Any], 153 d: int = None, 154 ): 155 """ 156 Parameters 157 ---------- 158 mdp : BaseMDP 159 The tabular MDP. 160 noise_class : Type["Noise"] 161 The noise that renders the emission map stochastic. 162 noise_kwargs : Dict[str, Any] 163 The parameters for the noise class. 164 d : int 165 The dimensionality of the non-tabular representation vector. 166 """ 167 168 self._features = None 169 self._d = ( 170 max(config.get_min_linear_feature_dim(), int(mdp.n_states * 0.1)) 171 if d is None 172 else d 173 ) 174 175 super(StateLinear, self).__init__(mdp, noise_class, noise_kwargs) 176 177 @property 178 def is_tabular(self) -> bool: 179 return False 180 181 @property 182 @abc.abstractmethod 183 def V(self) -> np.ndarray: 184 """ 185 Returns 186 ------- 187 np.ndarray 188 The value function w.r.t. which the non-tabular representation is linear. 189 """ 190 191 def _sample_features(self): 192 self._features = _sample_linear_value_features( 193 self.V, self._d, self._mdp.H if self._mdp.is_episodic() else None 194 ).astype(np.float32) 195 196 def node_to_observation( 197 self, node: "NODE_TYPE", in_episode_time: int = None 198 ) -> np.ndarray: 199 if self._features is None: 200 self._sample_features() 201 if in_episode_time is not None and self._mdp.is_episodic(): 202 return self._features[in_episode_time, self._mdp.node_to_index[node]] 203 return self._features[self._mdp.node_to_index[node]] 204 205 206def get_emission_map_from_name(emission_map_name: str) -> Type[EmissionMap]: 207 """ 208 Returns 209 ------- 210 EmissionMap 211 The EmissionMap class corresponding to the name of the emission map given in input. 212 """ 213 return importlib.import_module( 214 f"colosseum.emission_maps.{re.sub(r'(?<!^)(?=[A-Z])', '_', emission_map_name).lower()}" 215 ).__getattribute__(emission_map_name) 216 217 218def _sample_linear_value_features(v: np.ndarray, d: int, H: int = None): 219 psi = np.random.randn(v.size, d) 220 psi[:, 0] = 1 221 psi[:, 1] = v 222 223 P = psi @ np.linalg.inv(psi.T @ psi) @ psi.T 224 225 W = np.random.randn(v.size, d) 226 W[:, 0] = 1 227 228 W_p = P @ W 229 features = W_p / np.linalg.norm(W_p, axis=0, keepdims=True) 230 if H is not None: 231 features = features.reshape(H + 1, -1, d) 232 return features 233 234 235def _get_symbol_mapping(mdp: "BaseMDP") -> Dict[str, int]: 236 symbols = mdp.get_unique_symbols() 237 return dict(zip(symbols, range(len(symbols))))
class
EmissionMap(abc.ABC):
17class EmissionMap(abc.ABC): 18 """ 19 The base class to define emission maps that transform tabular MDPs into non-tabular versions. 20 """ 21 22 @property 23 @abc.abstractmethod 24 def is_tabular(self) -> bool: 25 """ 26 Returns 27 ------- 28 bool 29 The boolean for whether the emission map is tabular. 30 """ 31 32 @abc.abstractmethod 33 def node_to_observation( 34 self, node: "NODE_TYPE", in_episode_time: int = None 35 ) -> np.ndarray: 36 """ 37 Returns 38 ------- 39 np.ndarray 40 The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the 41 current in-episode time step. 42 """ 43 44 @property 45 def shape(self) -> Tuple[int, ...]: 46 """ 47 Returns 48 ------- 49 Tuple[int, ...] 50 The shape of the non-tabular representation. 51 """ 52 if self._shape is None: 53 self._shape = self.node_to_observation(self._mdp.starting_nodes[0], 0).shape 54 return self._shape 55 56 @property 57 def all_observations(self) -> np.ndarray: 58 """ 59 Returns 60 ------- 61 np.ndarray 62 The numpy array containing all the non-tabular representation for the states in the MDP. In the episodic 63 case, it is episode length by number of state by number of action. In the continuous case, the dimension for 64 the episode is dropped. 65 """ 66 if self._observations is None: 67 if self._mdp.is_episodic(): 68 self._observations = np.empty( 69 (self._mdp.H, self._mdp.n_states, *self.shape), np.float32 70 ) 71 else: 72 self._observations = np.empty( 73 (self._mdp.n_states, *self.shape), np.float32 74 ) 75 76 for i, n in enumerate(self._mdp.G.nodes): 77 if self._mdp.is_episodic(): 78 for h in range(self._mdp.H): 79 self._observations[h, i] = self.node_to_observation(n, h) 80 else: 81 self._observations[i] = self.node_to_observation(n, None) 82 return self._observations 83 84 def __init__( 85 self, 86 mdp: "BaseMDP", 87 noise_class: Type["Noise"], 88 noise_kwargs: Dict[str, Any], 89 ): 90 """ 91 Parameters 92 ---------- 93 mdp : BaseMDP 94 The tabular MDP. 95 noise_class : Type["Noise"] 96 The noise that renders the emission map stochastic. 97 noise_kwargs : Dict[str, Any] 98 The parameters for the noise class. 99 """ 100 101 self._mdp = mdp 102 self._cached_obs = dict() 103 self._observations = None 104 self._shape = None 105 106 if noise_class is not None: 107 self._noise_map = noise_class(shape_f=lambda: self.shape, **noise_kwargs) 108 else: 109 self._noise_map = None 110 111 def get_observation( 112 self, state: "NODE_TYPE", in_episode_time: int = None 113 ) -> np.ndarray: 114 """ 115 computes the observation numpy array corresponding to the state in input. 116 117 Parameters 118 ---------- 119 state : NODE_TYPE 120 The state for which we are computing the observation. 121 in_episode_time : int 122 The in episode time. It is ignored in the continuous setting, and, by default, it is set to None. 123 124 Returns 125 ------- 126 np.ndarray 127 The observation. 128 """ 129 130 if self._mdp.is_episodic(): 131 if in_episode_time is None: 132 in_episode_time = self._mdp.h 133 if in_episode_time >= self._mdp.H: 134 return np.zeros(self.shape, np.float32) 135 if not self._mdp.is_episodic(): 136 in_episode_time = None 137 obs = self.all_observations[in_episode_time, self._mdp.node_to_index[state]] 138 if self._noise_map is not None: 139 return obs + next(self._noise_map) 140 return obs
The base class to define emission maps that transform tabular MDPs into non-tabular versions.
EmissionMap( mdp: colosseum.mdp.base.BaseMDP, noise_class: Type[colosseum.noises.base.Noise], noise_kwargs: Dict[str, Any])
84 def __init__( 85 self, 86 mdp: "BaseMDP", 87 noise_class: Type["Noise"], 88 noise_kwargs: Dict[str, Any], 89 ): 90 """ 91 Parameters 92 ---------- 93 mdp : BaseMDP 94 The tabular MDP. 95 noise_class : Type["Noise"] 96 The noise that renders the emission map stochastic. 97 noise_kwargs : Dict[str, Any] 98 The parameters for the noise class. 99 """ 100 101 self._mdp = mdp 102 self._cached_obs = dict() 103 self._observations = None 104 self._shape = None 105 106 if noise_class is not None: 107 self._noise_map = noise_class(shape_f=lambda: self.shape, **noise_kwargs) 108 else: 109 self._noise_map = None
Parameters
- mdp (BaseMDP): The tabular MDP.
- noise_class (Type["Noise"]): The noise that renders the emission map stochastic.
- noise_kwargs (Dict[str, Any]): The parameters for the noise class.
@abc.abstractmethod
def
node_to_observation( self, node: Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], in_episode_time: int = None) -> numpy.ndarray:
32 @abc.abstractmethod 33 def node_to_observation( 34 self, node: "NODE_TYPE", in_episode_time: int = None 35 ) -> np.ndarray: 36 """ 37 Returns 38 ------- 39 np.ndarray 40 The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the 41 current in-episode time step. 42 """
Returns
- np.ndarray: The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the current in-episode time step.
all_observations: numpy.ndarray
Returns
- np.ndarray: The numpy array containing all the non-tabular representation for the states in the MDP. In the episodic case, it is episode length by number of state by number of action. In the continuous case, the dimension for the episode is dropped.
def
get_observation( self, state: Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], in_episode_time: int = None) -> numpy.ndarray:
111 def get_observation( 112 self, state: "NODE_TYPE", in_episode_time: int = None 113 ) -> np.ndarray: 114 """ 115 computes the observation numpy array corresponding to the state in input. 116 117 Parameters 118 ---------- 119 state : NODE_TYPE 120 The state for which we are computing the observation. 121 in_episode_time : int 122 The in episode time. It is ignored in the continuous setting, and, by default, it is set to None. 123 124 Returns 125 ------- 126 np.ndarray 127 The observation. 128 """ 129 130 if self._mdp.is_episodic(): 131 if in_episode_time is None: 132 in_episode_time = self._mdp.h 133 if in_episode_time >= self._mdp.H: 134 return np.zeros(self.shape, np.float32) 135 if not self._mdp.is_episodic(): 136 in_episode_time = None 137 obs = self.all_observations[in_episode_time, self._mdp.node_to_index[state]] 138 if self._noise_map is not None: 139 return obs + next(self._noise_map) 140 return obs
computes the observation numpy array corresponding to the state in input.
Parameters
- state (NODE_TYPE): The state for which we are computing the observation.
- in_episode_time (int): The in episode time. It is ignored in the continuous setting, and, by default, it is set to None.
Returns
- np.ndarray: The observation.
143class StateLinear(EmissionMap, abc.ABC): 144 """ 145 The base class for the emission map such that the non-tabular representation is a vector for which the value function 146 of a given policy is linear. 147 """ 148 149 def __init__( 150 self, 151 mdp: "BaseMDP", 152 noise_class: Type["Noise"], 153 noise_kwargs: Dict[str, Any], 154 d: int = None, 155 ): 156 """ 157 Parameters 158 ---------- 159 mdp : BaseMDP 160 The tabular MDP. 161 noise_class : Type["Noise"] 162 The noise that renders the emission map stochastic. 163 noise_kwargs : Dict[str, Any] 164 The parameters for the noise class. 165 d : int 166 The dimensionality of the non-tabular representation vector. 167 """ 168 169 self._features = None 170 self._d = ( 171 max(config.get_min_linear_feature_dim(), int(mdp.n_states * 0.1)) 172 if d is None 173 else d 174 ) 175 176 super(StateLinear, self).__init__(mdp, noise_class, noise_kwargs) 177 178 @property 179 def is_tabular(self) -> bool: 180 return False 181 182 @property 183 @abc.abstractmethod 184 def V(self) -> np.ndarray: 185 """ 186 Returns 187 ------- 188 np.ndarray 189 The value function w.r.t. which the non-tabular representation is linear. 190 """ 191 192 def _sample_features(self): 193 self._features = _sample_linear_value_features( 194 self.V, self._d, self._mdp.H if self._mdp.is_episodic() else None 195 ).astype(np.float32) 196 197 def node_to_observation( 198 self, node: "NODE_TYPE", in_episode_time: int = None 199 ) -> np.ndarray: 200 if self._features is None: 201 self._sample_features() 202 if in_episode_time is not None and self._mdp.is_episodic(): 203 return self._features[in_episode_time, self._mdp.node_to_index[node]] 204 return self._features[self._mdp.node_to_index[node]]
The base class for the emission map such that the non-tabular representation is a vector for which the value function of a given policy is linear.
StateLinear( mdp: colosseum.mdp.base.BaseMDP, noise_class: Type[colosseum.noises.base.Noise], noise_kwargs: Dict[str, Any], d: int = None)
149 def __init__( 150 self, 151 mdp: "BaseMDP", 152 noise_class: Type["Noise"], 153 noise_kwargs: Dict[str, Any], 154 d: int = None, 155 ): 156 """ 157 Parameters 158 ---------- 159 mdp : BaseMDP 160 The tabular MDP. 161 noise_class : Type["Noise"] 162 The noise that renders the emission map stochastic. 163 noise_kwargs : Dict[str, Any] 164 The parameters for the noise class. 165 d : int 166 The dimensionality of the non-tabular representation vector. 167 """ 168 169 self._features = None 170 self._d = ( 171 max(config.get_min_linear_feature_dim(), int(mdp.n_states * 0.1)) 172 if d is None 173 else d 174 ) 175 176 super(StateLinear, self).__init__(mdp, noise_class, noise_kwargs)
Parameters
- mdp (BaseMDP): The tabular MDP.
- noise_class (Type["Noise"]): The noise that renders the emission map stochastic.
- noise_kwargs (Dict[str, Any]): The parameters for the noise class.
- d (int): The dimensionality of the non-tabular representation vector.
V: numpy.ndarray
Returns
- np.ndarray: The value function w.r.t. which the non-tabular representation is linear.
def
node_to_observation( self, node: Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], in_episode_time: int = None) -> numpy.ndarray:
197 def node_to_observation( 198 self, node: "NODE_TYPE", in_episode_time: int = None 199 ) -> np.ndarray: 200 if self._features is None: 201 self._sample_features() 202 if in_episode_time is not None and self._mdp.is_episodic(): 203 return self._features[in_episode_time, self._mdp.node_to_index[node]] 204 return self._features[self._mdp.node_to_index[node]]
Returns
- np.ndarray: The non-tabular representation corresponding to the state given in input. Episodic MDPs also requires the current in-episode time step.
Inherited Members
def
get_emission_map_from_name(emission_map_name: str) -> Type[colosseum.emission_maps.base.EmissionMap]:
207def get_emission_map_from_name(emission_map_name: str) -> Type[EmissionMap]: 208 """ 209 Returns 210 ------- 211 EmissionMap 212 The EmissionMap class corresponding to the name of the emission map given in input. 213 """ 214 return importlib.import_module( 215 f"colosseum.emission_maps.{re.sub(r'(?<!^)(?=[A-Z])', '_', emission_map_name).lower()}" 216 ).__getattribute__(emission_map_name)
Returns
- EmissionMap: The EmissionMap class corresponding to the name of the emission map given in input.