colosseum.mdp.utils.mdp_creation

  1import os
  2from dataclasses import dataclass
  3from typing import TYPE_CHECKING, Callable, Dict, List, Tuple
  4
  5import networkx as nx
  6import numpy as np
  7import sparse
  8from numpy.core._exceptions import _ArrayMemoryError
  9from scipy.stats import rv_continuous
 10from tqdm import tqdm
 11
 12from colosseum import config
 13from colosseum.mdp.utils.custom_samplers import NextStateSampler
 14
 15if TYPE_CHECKING:
 16    from colosseum.mdp import ACTION_TYPE, NODE_TYPE, BaseMDP
 17
 18
 19@dataclass()
 20class NodeInfoClass:
 21    """
 22    The data class containing some quantities related to the nodes.
 23    """
 24
 25    transition_distributions: Dict[int, NextStateSampler]
 26    """A dictionary that maps actions to next state distributions."""
 27    actions_visitation_count: Dict[int, int]
 28    """The dictionary that keeps the count of how many times each action has been selected for the current node."""
 29    state_visitation_count: int = 0
 30    """The counter for the number of times the node has been visited."""
 31
 32    def update_visitation_counts(self, action: int = None):
 33        self.state_visitation_count += 1
 34        if action is not None:
 35            self.actions_visitation_count[action] += 1
 36
 37    def sample_next_state(self, action: int):
 38        return self.transition_distributions[action].sample()
 39
 40
 41def get_transition_matrix_and_rewards(
 42    n_states: int,
 43    n_actions: int,
 44    G: nx.DiGraph,
 45    get_info_class: Callable[["NODE_TYPE"], NodeInfoClass],
 46    get_reward_distribution: Callable[
 47        ["NODE_TYPE", "ACTION_TYPE", "NODE_TYPE"], rv_continuous
 48    ],
 49    node_to_index: Dict["NODE_TYPE", int],
 50    is_sparse: bool = False,
 51) -> Tuple[np.ndarray, np.ndarray]:
 52    """
 53    Returns
 54    -------
 55    np.ndarray
 56        The transition 3d array of the MDP.
 57    np.ndarray
 58        The reward matrix of the MDP.
 59    """
 60    if not is_sparse:
 61        mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")
 62        is_sparse = n_states ** 2 * n_actions * np.float32().itemsize > 0.1 * mem_bytes
 63    if is_sparse:
 64        T = dict()
 65    else:
 66        T = np.zeros(
 67            (n_states, n_actions, n_states),
 68            dtype=np.float32,
 69        )
 70
 71    R = np.zeros((n_states, n_actions), dtype=np.float32)
 72    for i, node in enumerate(G.nodes()):
 73        for action, td in get_info_class(node).transition_distributions.items():
 74            r = 0
 75            for next_state, prob in zip(td.next_nodes, td.probs):
 76                r += prob * get_reward_distribution(node, action, next_state).mean()
 77                if is_sparse and (i, action, node_to_index[next_state]) not in T:
 78                    T[i, action, node_to_index[next_state]] = prob
 79                    continue
 80                T[i, action, node_to_index[next_state]] += prob
 81            R[i, action] = r
 82
 83    if is_sparse:
 84        coords = [[], [], []]
 85        data = []
 86        for k, v in T.items():
 87            coords[0].append(k[0])
 88            coords[1].append(k[1])
 89            coords[2].append(k[2])
 90            data.append(np.float32(v))
 91        T = sparse.COO(coords, data, shape=(n_states, n_actions, n_states))
 92
 93    assert np.isclose(T.sum(-1).todense() if is_sparse else T.sum(-1), 1).all()
 94    assert np.isnan(R).sum() == 0
 95    return T, R
 96
 97
 98def get_episodic_transition_matrix_and_rewards(
 99    H: int,
100    T: np.ndarray,
101    R: np.ndarray,
102    starting_node_sampler: NextStateSampler,
103    node_to_index: Dict["NODE_TYPE", int],
104) -> Tuple[np.ndarray, np.ndarray]:
105    """
106    Returns
107    -------
108    np.ndarray
109        The episodic transition 4d array of the MDP.
110    np.ndarray
111        The 3d episodic reward array of the MDP.
112    """
113    n_states, n_actions = R.shape
114    T_epi = np.zeros(
115        (H, n_states, n_actions, n_states),
116        dtype=np.float32,
117    )
118    for sn, p in starting_node_sampler.next_nodes_and_probs:
119        sn = node_to_index[sn]
120        T_epi[0, sn] = T[sn]
121        T_epi[H - 1, :, :, sn] = p
122    for h in range(1, H - 1):
123        for s in range(len(T)):
124            if T_epi[h - 1, :, :, s].sum() > 0:
125                T_epi[h, s] = T[s]
126    R = np.tile(R, (H, 1, 1))
127    R[-1] = 0.0
128    return T_epi, R
129
130
131def get_continuous_form_episodic_transition_matrix_and_rewards(
132    H: int,
133    G: nx.DiGraph,
134    T: np.ndarray,
135    R: np.ndarray,
136    starting_node_sampler: NextStateSampler,
137    node_to_index: Dict["NODE_TYPE", int],
138) -> Tuple[np.ndarray, np.ndarray]:
139    """
140    Returns
141    -------
142    np.ndarray
143        The transition 3d array for the continuous form of the MDP.
144    np.ndarray
145        The reward matrix for the continuous form of the MDP.
146    """
147    _, n_action = R.shape
148
149    try:
150        T_epi = np.zeros((len(G.nodes), n_action, len(G.nodes)), np.float32)
151        R_epi = np.zeros((len(G.nodes), n_action), np.float32)
152    except _ArrayMemoryError:
153        raise ValueError(
154            "It is not possible calculate the value for this MDP. Its continuous form is too large."
155        )
156
157    nodes = list(G.nodes)
158    for h, n in (
159        tqdm(
160            G.nodes,
161            desc="Continuous form episodic transition matrix and rewards",
162        )
163        if config.VERBOSE_LEVEL > 0
164        else G.nodes
165    ):
166        if h == H - 1:
167            for sn, p in starting_node_sampler.next_nodes_and_probs:
168                T_epi[nodes.index((h, n)), :, node_to_index[sn]] = p
169                R_epi[nodes.index((h, n))] = R[n]
170        else:
171            for hp1, nn in G.successors((h, n)):
172                T_epi[nodes.index((h, n)), :, nodes.index((hp1, nn))] = T[n, :, nn]
173                R_epi[nodes.index((h, n))] = R[n]
174
175    assert np.isclose(T_epi.sum(-1), 1.0).all()
176    return T_epi, R_epi
177
178
179def get_episodic_graph(
180    G: nx.DiGraph,
181    H: int,
182    node_to_index: Dict["NODE_TYPE", int],
183    starting_nodes,
184    remove_label=False,
185) -> nx.DiGraph:
186    """
187    Returns
188    -------
189    nx.DiGraph
190        The graph of the MDP augmented with the time step in the state space.
191    """
192
193    def add_successors(n, h):
194        n_ = node_to_index[n] if remove_label else n
195        if h < H - 1:
196            successors = G.successors(n)
197        else:
198            successors = starting_nodes
199        for succ in successors:
200            succ_ = node_to_index[succ] if remove_label else succ
201            next_h = (h + 1) if h + 1 != H else 0
202            G_epi.add_edge((h, n_), (next_h, succ_))
203            if h < H - 1 and len(list(G_epi.successors((next_h, succ_)))) == 0:
204                add_successors(succ, next_h)
205
206    G_epi = nx.DiGraph()
207    for sn in starting_nodes:
208        add_successors(sn, 0)
209    return G_epi
210
211
212def instantiate_transitions(mdp: "BaseMDP", node: "NODE_TYPE"):
213    """
214    recursively instantiate the transitions of MDPs.
215    """
216    if not mdp.G.has_node(node) or len(list(mdp.G.successors(node))) == 0:
217        transition_distributions = dict()
218        for a in range(mdp.n_actions):
219            td = _instantiate_individual_transition(mdp, node, a)
220
221            if not td.is_deterministic:
222                mdp._are_all_transition_deterministic = False
223
224            for ns in td.next_nodes:
225                instantiate_transitions(mdp, ns)
226            transition_distributions[mdp._inverse_action_mapping(node, a)] = td
227
228        assert all(
229            action in transition_distributions.keys() for action in range(mdp.n_actions)
230        )
231        _add_node_info_class(mdp, node, transition_distributions)
232
233
234def _compute_transition(mdp: "BaseMDP", next_states, probs, node, action, next_node, p):
235    next_states.append(next_node)
236    probs.append(p)
237    if (
238        mdp._are_all_rewards_deterministic
239        and mdp.get_reward_distribution(node, action, next_node).dist.name
240        != "deterministic"
241    ):
242        mdp._are_all_rewards_deterministic = False
243    mdp.G.add_edge(node, next_node)
244
245
246def _add_node_info_class(
247    mdp: "BaseMDP",
248    n: "NODE_TYPE",
249    transition_distributions: Dict[int, NextStateSampler],
250):
251    """
252    adds a container class (NodeInfoClass) in the state n containing the transition distributions.
253
254    Parameters
255    ----------
256    n : NodeType
257        the state to which it adds the NodeInfoClass
258    transition_distributions : Dict[int, NextStateSampler]
259        the dictionary containing the transition distributions.
260    """
261    mdp.G.nodes[n]["info_class"] = NodeInfoClass(
262        transition_distributions=transition_distributions,
263        actions_visitation_count=dict.fromkeys(range(mdp.n_actions), 0),
264    )
265
266
267def _get_next_node(
268    mdp: "BaseMDP", node: "NODE_TYPE", action: "ACTION_TYPE"
269) -> List[Tuple["NODE_TYPE", float]]:
270    return [
271        (mdp.get_node_class()(**node_prms), prob)
272        for node_prms, prob in mdp._get_next_nodes_parameters(node, action)
273    ]
274
275
276def _instantiate_individual_transition(
277    mdp: "BaseMDP", node: "NODE_TYPE", action: "ACTION_TYPE"
278) -> NextStateSampler:
279    next_nodes = []
280    probs = []
281    for next_node, p in _get_next_node(mdp, node, action):
282        p1_lazy = 1.0 if mdp._p_lazy is None else (1 - mdp._p_lazy)
283        p = p1_lazy * p
284        p = (
285            p
286            if mdp._p_rand is None
287            else ((1 - mdp._p_rand) * p + p * mdp._p_rand / mdp.n_actions)
288        )
289        _compute_transition(mdp, next_nodes, probs, node, action, next_node, p)
290    if mdp._p_lazy is not None:
291        next_node = node
292        _compute_transition(
293            mdp, next_nodes, probs, node, action, next_node, mdp._p_lazy
294        )
295    if mdp._p_rand is not None:
296        for a in range(mdp.n_actions):
297            if a == action:
298                continue
299            for next_node, p in _get_next_node(mdp, node, a):
300                p = p1_lazy * mdp._p_rand * p / mdp.n_actions
301                # p = p1_lazy * p
302                _compute_transition(mdp, next_nodes, probs, node, action, next_node, p)
303
304    assert np.isclose(sum(probs), 1.0)
305
306    return NextStateSampler(
307        next_nodes=next_nodes,
308        probs=probs,
309        seed=mdp._produce_random_seed(),
310    )
@dataclass()
class NodeInfoClass:
20@dataclass()
21class NodeInfoClass:
22    """
23    The data class containing some quantities related to the nodes.
24    """
25
26    transition_distributions: Dict[int, NextStateSampler]
27    """A dictionary that maps actions to next state distributions."""
28    actions_visitation_count: Dict[int, int]
29    """The dictionary that keeps the count of how many times each action has been selected for the current node."""
30    state_visitation_count: int = 0
31    """The counter for the number of times the node has been visited."""
32
33    def update_visitation_counts(self, action: int = None):
34        self.state_visitation_count += 1
35        if action is not None:
36            self.actions_visitation_count[action] += 1
37
38    def sample_next_state(self, action: int):
39        return self.transition_distributions[action].sample()

The data class containing some quantities related to the nodes.

NodeInfoClass( transition_distributions: Dict[int, colosseum.mdp.utils.custom_samplers.NextStateSampler], actions_visitation_count: Dict[int, int], state_visitation_count: int = 0)
transition_distributions: Dict[int, colosseum.mdp.utils.custom_samplers.NextStateSampler]

A dictionary that maps actions to next state distributions.

actions_visitation_count: Dict[int, int]

The dictionary that keeps the count of how many times each action has been selected for the current node.

state_visitation_count: int = 0

The counter for the number of times the node has been visited.

def update_visitation_counts(self, action: int = None):
33    def update_visitation_counts(self, action: int = None):
34        self.state_visitation_count += 1
35        if action is not None:
36            self.actions_visitation_count[action] += 1
def sample_next_state(self, action: int):
38    def sample_next_state(self, action: int):
39        return self.transition_distributions[action].sample()
def get_transition_matrix_and_rewards( n_states: int, n_actions: int, G: networkx.classes.digraph.DiGraph, get_info_class: Callable[[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]], colosseum.mdp.utils.mdp_creation.NodeInfoClass], get_reward_distribution: Callable[[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], Union[int, float, numpy.ndarray], Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]], scipy.stats._distn_infrastructure.rv_continuous], node_to_index: Dict[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], int], is_sparse: bool = False) -> Tuple[numpy.ndarray, numpy.ndarray]:
42def get_transition_matrix_and_rewards(
43    n_states: int,
44    n_actions: int,
45    G: nx.DiGraph,
46    get_info_class: Callable[["NODE_TYPE"], NodeInfoClass],
47    get_reward_distribution: Callable[
48        ["NODE_TYPE", "ACTION_TYPE", "NODE_TYPE"], rv_continuous
49    ],
50    node_to_index: Dict["NODE_TYPE", int],
51    is_sparse: bool = False,
52) -> Tuple[np.ndarray, np.ndarray]:
53    """
54    Returns
55    -------
56    np.ndarray
57        The transition 3d array of the MDP.
58    np.ndarray
59        The reward matrix of the MDP.
60    """
61    if not is_sparse:
62        mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")
63        is_sparse = n_states ** 2 * n_actions * np.float32().itemsize > 0.1 * mem_bytes
64    if is_sparse:
65        T = dict()
66    else:
67        T = np.zeros(
68            (n_states, n_actions, n_states),
69            dtype=np.float32,
70        )
71
72    R = np.zeros((n_states, n_actions), dtype=np.float32)
73    for i, node in enumerate(G.nodes()):
74        for action, td in get_info_class(node).transition_distributions.items():
75            r = 0
76            for next_state, prob in zip(td.next_nodes, td.probs):
77                r += prob * get_reward_distribution(node, action, next_state).mean()
78                if is_sparse and (i, action, node_to_index[next_state]) not in T:
79                    T[i, action, node_to_index[next_state]] = prob
80                    continue
81                T[i, action, node_to_index[next_state]] += prob
82            R[i, action] = r
83
84    if is_sparse:
85        coords = [[], [], []]
86        data = []
87        for k, v in T.items():
88            coords[0].append(k[0])
89            coords[1].append(k[1])
90            coords[2].append(k[2])
91            data.append(np.float32(v))
92        T = sparse.COO(coords, data, shape=(n_states, n_actions, n_states))
93
94    assert np.isclose(T.sum(-1).todense() if is_sparse else T.sum(-1), 1).all()
95    assert np.isnan(R).sum() == 0
96    return T, R
Returns
  • np.ndarray: The transition 3d array of the MDP.
  • np.ndarray: The reward matrix of the MDP.
 99def get_episodic_transition_matrix_and_rewards(
100    H: int,
101    T: np.ndarray,
102    R: np.ndarray,
103    starting_node_sampler: NextStateSampler,
104    node_to_index: Dict["NODE_TYPE", int],
105) -> Tuple[np.ndarray, np.ndarray]:
106    """
107    Returns
108    -------
109    np.ndarray
110        The episodic transition 4d array of the MDP.
111    np.ndarray
112        The 3d episodic reward array of the MDP.
113    """
114    n_states, n_actions = R.shape
115    T_epi = np.zeros(
116        (H, n_states, n_actions, n_states),
117        dtype=np.float32,
118    )
119    for sn, p in starting_node_sampler.next_nodes_and_probs:
120        sn = node_to_index[sn]
121        T_epi[0, sn] = T[sn]
122        T_epi[H - 1, :, :, sn] = p
123    for h in range(1, H - 1):
124        for s in range(len(T)):
125            if T_epi[h - 1, :, :, s].sum() > 0:
126                T_epi[h, s] = T[s]
127    R = np.tile(R, (H, 1, 1))
128    R[-1] = 0.0
129    return T_epi, R
Returns
  • np.ndarray: The episodic transition 4d array of the MDP.
  • np.ndarray: The 3d episodic reward array of the MDP.
def get_continuous_form_episodic_transition_matrix_and_rewards( H: int, G: networkx.classes.digraph.DiGraph, T: numpy.ndarray, R: numpy.ndarray, starting_node_sampler: colosseum.mdp.utils.custom_samplers.NextStateSampler, node_to_index: Dict[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode], int]) -> Tuple[numpy.ndarray, numpy.ndarray]:
132def get_continuous_form_episodic_transition_matrix_and_rewards(
133    H: int,
134    G: nx.DiGraph,
135    T: np.ndarray,
136    R: np.ndarray,
137    starting_node_sampler: NextStateSampler,
138    node_to_index: Dict["NODE_TYPE", int],
139) -> Tuple[np.ndarray, np.ndarray]:
140    """
141    Returns
142    -------
143    np.ndarray
144        The transition 3d array for the continuous form of the MDP.
145    np.ndarray
146        The reward matrix for the continuous form of the MDP.
147    """
148    _, n_action = R.shape
149
150    try:
151        T_epi = np.zeros((len(G.nodes), n_action, len(G.nodes)), np.float32)
152        R_epi = np.zeros((len(G.nodes), n_action), np.float32)
153    except _ArrayMemoryError:
154        raise ValueError(
155            "It is not possible calculate the value for this MDP. Its continuous form is too large."
156        )
157
158    nodes = list(G.nodes)
159    for h, n in (
160        tqdm(
161            G.nodes,
162            desc="Continuous form episodic transition matrix and rewards",
163        )
164        if config.VERBOSE_LEVEL > 0
165        else G.nodes
166    ):
167        if h == H - 1:
168            for sn, p in starting_node_sampler.next_nodes_and_probs:
169                T_epi[nodes.index((h, n)), :, node_to_index[sn]] = p
170                R_epi[nodes.index((h, n))] = R[n]
171        else:
172            for hp1, nn in G.successors((h, n)):
173                T_epi[nodes.index((h, n)), :, nodes.index((hp1, nn))] = T[n, :, nn]
174                R_epi[nodes.index((h, n))] = R[n]
175
176    assert np.isclose(T_epi.sum(-1), 1.0).all()
177    return T_epi, R_epi
Returns
  • np.ndarray: The transition 3d array for the continuous form of the MDP.
  • np.ndarray: The reward matrix for the continuous form of the MDP.
180def get_episodic_graph(
181    G: nx.DiGraph,
182    H: int,
183    node_to_index: Dict["NODE_TYPE", int],
184    starting_nodes,
185    remove_label=False,
186) -> nx.DiGraph:
187    """
188    Returns
189    -------
190    nx.DiGraph
191        The graph of the MDP augmented with the time step in the state space.
192    """
193
194    def add_successors(n, h):
195        n_ = node_to_index[n] if remove_label else n
196        if h < H - 1:
197            successors = G.successors(n)
198        else:
199            successors = starting_nodes
200        for succ in successors:
201            succ_ = node_to_index[succ] if remove_label else succ
202            next_h = (h + 1) if h + 1 != H else 0
203            G_epi.add_edge((h, n_), (next_h, succ_))
204            if h < H - 1 and len(list(G_epi.successors((next_h, succ_)))) == 0:
205                add_successors(succ, next_h)
206
207    G_epi = nx.DiGraph()
208    for sn in starting_nodes:
209        add_successors(sn, 0)
210    return G_epi
Returns
  • nx.DiGraph: The graph of the MDP augmented with the time step in the state space.
213def instantiate_transitions(mdp: "BaseMDP", node: "NODE_TYPE"):
214    """
215    recursively instantiate the transitions of MDPs.
216    """
217    if not mdp.G.has_node(node) or len(list(mdp.G.successors(node))) == 0:
218        transition_distributions = dict()
219        for a in range(mdp.n_actions):
220            td = _instantiate_individual_transition(mdp, node, a)
221
222            if not td.is_deterministic:
223                mdp._are_all_transition_deterministic = False
224
225            for ns in td.next_nodes:
226                instantiate_transitions(mdp, ns)
227            transition_distributions[mdp._inverse_action_mapping(node, a)] = td
228
229        assert all(
230            action in transition_distributions.keys() for action in range(mdp.n_actions)
231        )
232        _add_node_info_class(mdp, node, transition_distributions)

recursively instantiate the transitions of MDPs.