colosseum.mdp.utils.mdp_creation
1import os 2from dataclasses import dataclass 3from typing import TYPE_CHECKING, Callable, Dict, List, Tuple 4 5import networkx as nx 6import numpy as np 7import sparse 8from numpy.core._exceptions import _ArrayMemoryError 9from scipy.stats import rv_continuous 10from tqdm import tqdm 11 12from colosseum import config 13from colosseum.mdp.utils.custom_samplers import NextStateSampler 14 15if TYPE_CHECKING: 16 from colosseum.mdp import ACTION_TYPE, NODE_TYPE, BaseMDP 17 18 19@dataclass() 20class NodeInfoClass: 21 """ 22 The data class containing some quantities related to the nodes. 23 """ 24 25 transition_distributions: Dict[int, NextStateSampler] 26 """A dictionary that maps actions to next state distributions.""" 27 actions_visitation_count: Dict[int, int] 28 """The dictionary that keeps the count of how many times each action has been selected for the current node.""" 29 state_visitation_count: int = 0 30 """The counter for the number of times the node has been visited.""" 31 32 def update_visitation_counts(self, action: int = None): 33 self.state_visitation_count += 1 34 if action is not None: 35 self.actions_visitation_count[action] += 1 36 37 def sample_next_state(self, action: int): 38 return self.transition_distributions[action].sample() 39 40 41def get_transition_matrix_and_rewards( 42 n_states: int, 43 n_actions: int, 44 G: nx.DiGraph, 45 get_info_class: Callable[["NODE_TYPE"], NodeInfoClass], 46 get_reward_distribution: Callable[ 47 ["NODE_TYPE", "ACTION_TYPE", "NODE_TYPE"], rv_continuous 48 ], 49 node_to_index: Dict["NODE_TYPE", int], 50 is_sparse: bool = False, 51) -> Tuple[np.ndarray, np.ndarray]: 52 """ 53 Returns 54 ------- 55 np.ndarray 56 The transition 3d array of the MDP. 57 np.ndarray 58 The reward matrix of the MDP. 59 """ 60 if not is_sparse: 61 mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") 62 is_sparse = n_states ** 2 * n_actions * np.float32().itemsize > 0.1 * mem_bytes 63 if is_sparse: 64 T = dict() 65 else: 66 T = np.zeros( 67 (n_states, n_actions, n_states), 68 dtype=np.float32, 69 ) 70 71 R = np.zeros((n_states, n_actions), dtype=np.float32) 72 for i, node in enumerate(G.nodes()): 73 for action, td in get_info_class(node).transition_distributions.items(): 74 r = 0 75 for next_state, prob in zip(td.next_nodes, td.probs): 76 r += prob * get_reward_distribution(node, action, next_state).mean() 77 if is_sparse and (i, action, node_to_index[next_state]) not in T: 78 T[i, action, node_to_index[next_state]] = prob 79 continue 80 T[i, action, node_to_index[next_state]] += prob 81 R[i, action] = r 82 83 if is_sparse: 84 coords = [[], [], []] 85 data = [] 86 for k, v in T.items(): 87 coords[0].append(k[0]) 88 coords[1].append(k[1]) 89 coords[2].append(k[2]) 90 data.append(np.float32(v)) 91 T = sparse.COO(coords, data, shape=(n_states, n_actions, n_states)) 92 93 assert np.isclose(T.sum(-1).todense() if is_sparse else T.sum(-1), 1).all() 94 assert np.isnan(R).sum() == 0 95 return T, R 96 97 98def get_episodic_transition_matrix_and_rewards( 99 H: int, 100 T: np.ndarray, 101 R: np.ndarray, 102 starting_node_sampler: NextStateSampler, 103 node_to_index: Dict["NODE_TYPE", int], 104) -> Tuple[np.ndarray, np.ndarray]: 105 """ 106 Returns 107 ------- 108 np.ndarray 109 The episodic transition 4d array of the MDP. 110 np.ndarray 111 The 3d episodic reward array of the MDP. 112 """ 113 n_states, n_actions = R.shape 114 T_epi = np.zeros( 115 (H, n_states, n_actions, n_states), 116 dtype=np.float32, 117 ) 118 for sn, p in starting_node_sampler.next_nodes_and_probs: 119 sn = node_to_index[sn] 120 T_epi[0, sn] = T[sn] 121 T_epi[H - 1, :, :, sn] = p 122 for h in range(1, H - 1): 123 for s in range(len(T)): 124 if T_epi[h - 1, :, :, s].sum() > 0: 125 T_epi[h, s] = T[s] 126 R = np.tile(R, (H, 1, 1)) 127 R[-1] = 0.0 128 return T_epi, R 129 130 131def get_continuous_form_episodic_transition_matrix_and_rewards( 132 H: int, 133 G: nx.DiGraph, 134 T: np.ndarray, 135 R: np.ndarray, 136 starting_node_sampler: NextStateSampler, 137 node_to_index: Dict["NODE_TYPE", int], 138) -> Tuple[np.ndarray, np.ndarray]: 139 """ 140 Returns 141 ------- 142 np.ndarray 143 The transition 3d array for the continuous form of the MDP. 144 np.ndarray 145 The reward matrix for the continuous form of the MDP. 146 """ 147 _, n_action = R.shape 148 149 try: 150 T_epi = np.zeros((len(G.nodes), n_action, len(G.nodes)), np.float32) 151 R_epi = np.zeros((len(G.nodes), n_action), np.float32) 152 except _ArrayMemoryError: 153 raise ValueError( 154 "It is not possible calculate the value for this MDP. Its continuous form is too large." 155 ) 156 157 nodes = list(G.nodes) 158 for h, n in ( 159 tqdm( 160 G.nodes, 161 desc="Continuous form episodic transition matrix and rewards", 162 ) 163 if config.VERBOSE_LEVEL > 0 164 else G.nodes 165 ): 166 if h == H - 1: 167 for sn, p in starting_node_sampler.next_nodes_and_probs: 168 T_epi[nodes.index((h, n)), :, node_to_index[sn]] = p 169 R_epi[nodes.index((h, n))] = R[n] 170 else: 171 for hp1, nn in G.successors((h, n)): 172 T_epi[nodes.index((h, n)), :, nodes.index((hp1, nn))] = T[n, :, nn] 173 R_epi[nodes.index((h, n))] = R[n] 174 175 assert np.isclose(T_epi.sum(-1), 1.0).all() 176 return T_epi, R_epi 177 178 179def get_episodic_graph( 180 G: nx.DiGraph, 181 H: int, 182 node_to_index: Dict["NODE_TYPE", int], 183 starting_nodes, 184 remove_label=False, 185) -> nx.DiGraph: 186 """ 187 Returns 188 ------- 189 nx.DiGraph 190 The graph of the MDP augmented with the time step in the state space. 191 """ 192 193 def add_successors(n, h): 194 n_ = node_to_index[n] if remove_label else n 195 if h < H - 1: 196 successors = G.successors(n) 197 else: 198 successors = starting_nodes 199 for succ in successors: 200 succ_ = node_to_index[succ] if remove_label else succ 201 next_h = (h + 1) if h + 1 != H else 0 202 G_epi.add_edge((h, n_), (next_h, succ_)) 203 if h < H - 1 and len(list(G_epi.successors((next_h, succ_)))) == 0: 204 add_successors(succ, next_h) 205 206 G_epi = nx.DiGraph() 207 for sn in starting_nodes: 208 add_successors(sn, 0) 209 return G_epi 210 211 212def instantiate_transitions(mdp: "BaseMDP", node: "NODE_TYPE"): 213 """ 214 recursively instantiate the transitions of MDPs. 215 """ 216 if not mdp.G.has_node(node) or len(list(mdp.G.successors(node))) == 0: 217 transition_distributions = dict() 218 for a in range(mdp.n_actions): 219 td = _instantiate_individual_transition(mdp, node, a) 220 221 if not td.is_deterministic: 222 mdp._are_all_transition_deterministic = False 223 224 for ns in td.next_nodes: 225 instantiate_transitions(mdp, ns) 226 transition_distributions[mdp._inverse_action_mapping(node, a)] = td 227 228 assert all( 229 action in transition_distributions.keys() for action in range(mdp.n_actions) 230 ) 231 _add_node_info_class(mdp, node, transition_distributions) 232 233 234def _compute_transition(mdp: "BaseMDP", next_states, probs, node, action, next_node, p): 235 next_states.append(next_node) 236 probs.append(p) 237 if ( 238 mdp._are_all_rewards_deterministic 239 and mdp.get_reward_distribution(node, action, next_node).dist.name 240 != "deterministic" 241 ): 242 mdp._are_all_rewards_deterministic = False 243 mdp.G.add_edge(node, next_node) 244 245 246def _add_node_info_class( 247 mdp: "BaseMDP", 248 n: "NODE_TYPE", 249 transition_distributions: Dict[int, NextStateSampler], 250): 251 """ 252 adds a container class (NodeInfoClass) in the state n containing the transition distributions. 253 254 Parameters 255 ---------- 256 n : NodeType 257 the state to which it adds the NodeInfoClass 258 transition_distributions : Dict[int, NextStateSampler] 259 the dictionary containing the transition distributions. 260 """ 261 mdp.G.nodes[n]["info_class"] = NodeInfoClass( 262 transition_distributions=transition_distributions, 263 actions_visitation_count=dict.fromkeys(range(mdp.n_actions), 0), 264 ) 265 266 267def _get_next_node( 268 mdp: "BaseMDP", node: "NODE_TYPE", action: "ACTION_TYPE" 269) -> List[Tuple["NODE_TYPE", float]]: 270 return [ 271 (mdp.get_node_class()(**node_prms), prob) 272 for node_prms, prob in mdp._get_next_nodes_parameters(node, action) 273 ] 274 275 276def _instantiate_individual_transition( 277 mdp: "BaseMDP", node: "NODE_TYPE", action: "ACTION_TYPE" 278) -> NextStateSampler: 279 next_nodes = [] 280 probs = [] 281 for next_node, p in _get_next_node(mdp, node, action): 282 p1_lazy = 1.0 if mdp._p_lazy is None else (1 - mdp._p_lazy) 283 p = p1_lazy * p 284 p = ( 285 p 286 if mdp._p_rand is None 287 else ((1 - mdp._p_rand) * p + p * mdp._p_rand / mdp.n_actions) 288 ) 289 _compute_transition(mdp, next_nodes, probs, node, action, next_node, p) 290 if mdp._p_lazy is not None: 291 next_node = node 292 _compute_transition( 293 mdp, next_nodes, probs, node, action, next_node, mdp._p_lazy 294 ) 295 if mdp._p_rand is not None: 296 for a in range(mdp.n_actions): 297 if a == action: 298 continue 299 for next_node, p in _get_next_node(mdp, node, a): 300 p = p1_lazy * mdp._p_rand * p / mdp.n_actions 301 # p = p1_lazy * p 302 _compute_transition(mdp, next_nodes, probs, node, action, next_node, p) 303 304 assert np.isclose(sum(probs), 1.0) 305 306 return NextStateSampler( 307 next_nodes=next_nodes, 308 probs=probs, 309 seed=mdp._produce_random_seed(), 310 )
@dataclass()
class
NodeInfoClass:
20@dataclass() 21class NodeInfoClass: 22 """ 23 The data class containing some quantities related to the nodes. 24 """ 25 26 transition_distributions: Dict[int, NextStateSampler] 27 """A dictionary that maps actions to next state distributions.""" 28 actions_visitation_count: Dict[int, int] 29 """The dictionary that keeps the count of how many times each action has been selected for the current node.""" 30 state_visitation_count: int = 0 31 """The counter for the number of times the node has been visited.""" 32 33 def update_visitation_counts(self, action: int = None): 34 self.state_visitation_count += 1 35 if action is not None: 36 self.actions_visitation_count[action] += 1 37 38 def sample_next_state(self, action: int): 39 return self.transition_distributions[action].sample()
The data class containing some quantities related to the nodes.
NodeInfoClass( transition_distributions: Dict[int, colosseum.mdp.utils.custom_samplers.NextStateSampler], actions_visitation_count: Dict[int, int], state_visitation_count: int = 0)
transition_distributions: Dict[int, colosseum.mdp.utils.custom_samplers.NextStateSampler]
A dictionary that maps actions to next state distributions.
actions_visitation_count: Dict[int, int]
The dictionary that keeps the count of how many times each action has been selected for the current node.
def
get_transition_matrix_and_rewards( n_states: int, n_actions: int, G: networkx.classes.digraph.DiGraph, get_info_class: Callable[[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]], colosseum.mdp.utils.mdp_creation.NodeInfoClass], get_reward_distribution: Callable[[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], Union[int, float, numpy.ndarray], Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]], scipy.stats._distn_infrastructure.rv_continuous], node_to_index: Dict[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], int], is_sparse: bool = False) -> Tuple[numpy.ndarray, numpy.ndarray]:
42def get_transition_matrix_and_rewards( 43 n_states: int, 44 n_actions: int, 45 G: nx.DiGraph, 46 get_info_class: Callable[["NODE_TYPE"], NodeInfoClass], 47 get_reward_distribution: Callable[ 48 ["NODE_TYPE", "ACTION_TYPE", "NODE_TYPE"], rv_continuous 49 ], 50 node_to_index: Dict["NODE_TYPE", int], 51 is_sparse: bool = False, 52) -> Tuple[np.ndarray, np.ndarray]: 53 """ 54 Returns 55 ------- 56 np.ndarray 57 The transition 3d array of the MDP. 58 np.ndarray 59 The reward matrix of the MDP. 60 """ 61 if not is_sparse: 62 mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") 63 is_sparse = n_states ** 2 * n_actions * np.float32().itemsize > 0.1 * mem_bytes 64 if is_sparse: 65 T = dict() 66 else: 67 T = np.zeros( 68 (n_states, n_actions, n_states), 69 dtype=np.float32, 70 ) 71 72 R = np.zeros((n_states, n_actions), dtype=np.float32) 73 for i, node in enumerate(G.nodes()): 74 for action, td in get_info_class(node).transition_distributions.items(): 75 r = 0 76 for next_state, prob in zip(td.next_nodes, td.probs): 77 r += prob * get_reward_distribution(node, action, next_state).mean() 78 if is_sparse and (i, action, node_to_index[next_state]) not in T: 79 T[i, action, node_to_index[next_state]] = prob 80 continue 81 T[i, action, node_to_index[next_state]] += prob 82 R[i, action] = r 83 84 if is_sparse: 85 coords = [[], [], []] 86 data = [] 87 for k, v in T.items(): 88 coords[0].append(k[0]) 89 coords[1].append(k[1]) 90 coords[2].append(k[2]) 91 data.append(np.float32(v)) 92 T = sparse.COO(coords, data, shape=(n_states, n_actions, n_states)) 93 94 assert np.isclose(T.sum(-1).todense() if is_sparse else T.sum(-1), 1).all() 95 assert np.isnan(R).sum() == 0 96 return T, R
Returns
- np.ndarray: The transition 3d array of the MDP.
- np.ndarray: The reward matrix of the MDP.
def
get_episodic_transition_matrix_and_rewards( H: int, T: numpy.ndarray, R: numpy.ndarray, starting_node_sampler: colosseum.mdp.utils.custom_samplers.NextStateSampler, node_to_index: Dict[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], int]) -> Tuple[numpy.ndarray, numpy.ndarray]:
99def get_episodic_transition_matrix_and_rewards( 100 H: int, 101 T: np.ndarray, 102 R: np.ndarray, 103 starting_node_sampler: NextStateSampler, 104 node_to_index: Dict["NODE_TYPE", int], 105) -> Tuple[np.ndarray, np.ndarray]: 106 """ 107 Returns 108 ------- 109 np.ndarray 110 The episodic transition 4d array of the MDP. 111 np.ndarray 112 The 3d episodic reward array of the MDP. 113 """ 114 n_states, n_actions = R.shape 115 T_epi = np.zeros( 116 (H, n_states, n_actions, n_states), 117 dtype=np.float32, 118 ) 119 for sn, p in starting_node_sampler.next_nodes_and_probs: 120 sn = node_to_index[sn] 121 T_epi[0, sn] = T[sn] 122 T_epi[H - 1, :, :, sn] = p 123 for h in range(1, H - 1): 124 for s in range(len(T)): 125 if T_epi[h - 1, :, :, s].sum() > 0: 126 T_epi[h, s] = T[s] 127 R = np.tile(R, (H, 1, 1)) 128 R[-1] = 0.0 129 return T_epi, R
Returns
- np.ndarray: The episodic transition 4d array of the MDP.
- np.ndarray: The 3d episodic reward array of the MDP.
def
get_continuous_form_episodic_transition_matrix_and_rewards( H: int, G: networkx.classes.digraph.DiGraph, T: numpy.ndarray, R: numpy.ndarray, starting_node_sampler: colosseum.mdp.utils.custom_samplers.NextStateSampler, node_to_index: Dict[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], int]) -> Tuple[numpy.ndarray, numpy.ndarray]:
132def get_continuous_form_episodic_transition_matrix_and_rewards( 133 H: int, 134 G: nx.DiGraph, 135 T: np.ndarray, 136 R: np.ndarray, 137 starting_node_sampler: NextStateSampler, 138 node_to_index: Dict["NODE_TYPE", int], 139) -> Tuple[np.ndarray, np.ndarray]: 140 """ 141 Returns 142 ------- 143 np.ndarray 144 The transition 3d array for the continuous form of the MDP. 145 np.ndarray 146 The reward matrix for the continuous form of the MDP. 147 """ 148 _, n_action = R.shape 149 150 try: 151 T_epi = np.zeros((len(G.nodes), n_action, len(G.nodes)), np.float32) 152 R_epi = np.zeros((len(G.nodes), n_action), np.float32) 153 except _ArrayMemoryError: 154 raise ValueError( 155 "It is not possible calculate the value for this MDP. Its continuous form is too large." 156 ) 157 158 nodes = list(G.nodes) 159 for h, n in ( 160 tqdm( 161 G.nodes, 162 desc="Continuous form episodic transition matrix and rewards", 163 ) 164 if config.VERBOSE_LEVEL > 0 165 else G.nodes 166 ): 167 if h == H - 1: 168 for sn, p in starting_node_sampler.next_nodes_and_probs: 169 T_epi[nodes.index((h, n)), :, node_to_index[sn]] = p 170 R_epi[nodes.index((h, n))] = R[n] 171 else: 172 for hp1, nn in G.successors((h, n)): 173 T_epi[nodes.index((h, n)), :, nodes.index((hp1, nn))] = T[n, :, nn] 174 R_epi[nodes.index((h, n))] = R[n] 175 176 assert np.isclose(T_epi.sum(-1), 1.0).all() 177 return T_epi, R_epi
Returns
- np.ndarray: The transition 3d array for the continuous form of the MDP.
- np.ndarray: The reward matrix for the continuous form of the MDP.
def
get_episodic_graph( G: networkx.classes.digraph.DiGraph, H: int, node_to_index: Dict[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], int], starting_nodes, remove_label=False) -> networkx.classes.digraph.DiGraph:
180def get_episodic_graph( 181 G: nx.DiGraph, 182 H: int, 183 node_to_index: Dict["NODE_TYPE", int], 184 starting_nodes, 185 remove_label=False, 186) -> nx.DiGraph: 187 """ 188 Returns 189 ------- 190 nx.DiGraph 191 The graph of the MDP augmented with the time step in the state space. 192 """ 193 194 def add_successors(n, h): 195 n_ = node_to_index[n] if remove_label else n 196 if h < H - 1: 197 successors = G.successors(n) 198 else: 199 successors = starting_nodes 200 for succ in successors: 201 succ_ = node_to_index[succ] if remove_label else succ 202 next_h = (h + 1) if h + 1 != H else 0 203 G_epi.add_edge((h, n_), (next_h, succ_)) 204 if h < H - 1 and len(list(G_epi.successors((next_h, succ_)))) == 0: 205 add_successors(succ, next_h) 206 207 G_epi = nx.DiGraph() 208 for sn in starting_nodes: 209 add_successors(sn, 0) 210 return G_epi
Returns
- nx.DiGraph: The graph of the MDP augmented with the time step in the state space.
def
instantiate_transitions( mdp: colosseum.mdp.base.BaseMDP, node: Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]):
213def instantiate_transitions(mdp: "BaseMDP", node: "NODE_TYPE"): 214 """ 215 recursively instantiate the transitions of MDPs. 216 """ 217 if not mdp.G.has_node(node) or len(list(mdp.G.successors(node))) == 0: 218 transition_distributions = dict() 219 for a in range(mdp.n_actions): 220 td = _instantiate_individual_transition(mdp, node, a) 221 222 if not td.is_deterministic: 223 mdp._are_all_transition_deterministic = False 224 225 for ns in td.next_nodes: 226 instantiate_transitions(mdp, ns) 227 transition_distributions[mdp._inverse_action_mapping(node, a)] = td 228 229 assert all( 230 action in transition_distributions.keys() for action in range(mdp.n_actions) 231 ) 232 _add_node_info_class(mdp, node, transition_distributions)
recursively instantiate the transitions of MDPs.