colosseum.hardness.measures.diameter

  1from io import StringIO
  2from multiprocessing import Pool
  3from typing import TYPE_CHECKING, Iterable
  4
  5import networkx as nx
  6import numba
  7import numpy as np
  8import sparse
  9from tqdm import tqdm, trange
 10
 11from colosseum import config
 12from colosseum.dynamic_programming import discounted_value_iteration
 13from colosseum.dynamic_programming import episodic_value_iteration
 14from colosseum.utils import get_loop
 15
 16if TYPE_CHECKING:
 17    from colosseum.mdp import NODE_TYPE
 18
 19
 20def get_diameter(T: np.ndarray, is_episodic: bool, max_value: float = None) -> float:
 21    """
 22    Returns
 23    -------
 24    float
 25        The diameter for the transition matrix given in input. The is_episodic is only necessary to check whether
 26    the dimensionality of the transition matrix is effectively correct. Note that, for the episodic setting, this
 27    computes the diameter for the augmented state space.
 28    """
 29    assert (is_episodic and T.ndim == 4) or (not is_episodic and T.ndim == 3)
 30    if is_episodic:
 31        if config.get_available_cores() >= 3:
 32            return _multi_thread_episodic_diameter_calculation(T, max_value=max_value)
 33        return _single_thread_episodic_diameter_calculation(T, max_value=max_value)
 34
 35    if config.get_available_cores() >= 3:
 36        return _continuous_multi_thread_diam(T, max_value=max_value)
 37    if T.shape[-1] > 1000:
 38        return _get_sparse_diameter(T, max_value=max_value)
 39    return _continuous_single_thread_diam(T, max_value=max_value)
 40
 41
 42def get_in_episodic_diameter(
 43    H: int,
 44    T: np.ndarray,
 45    reachable_node: Iterable["NODE_TYPE"],
 46    max_value: float = None,
 47) -> float:
 48    """
 49    Returns
 50    -------
 51    float
 52        The diameter for the in episodic formulation. Note that in this case, the diameter will always be less than
 53    the time horizon.
 54    """
 55    if config.get_available_cores() >= 3:
 56        return _episodic_multi_thread_diam(H, T, reachable_node, max_value)
 57    return _episodic_single_thread_diam(H, T, reachable_node, max_value)
 58
 59
 60def get_diameter_for_determinsitic_MDPs(G: nx.DiGraph) -> float:
 61    """
 62    Returns
 63    -------
 64    float
 65        The diameter for the given graph that represents an MDP. Note that this can be considerably slower than
 66    the dynamic programming implementation we propose.
 67    """
 68    A = nx.to_numpy_array(G, nonedge=np.inf)
 69    n, m = A.shape
 70    np.fill_diagonal(A, 0)  # diagonal elements should be zero
 71    for i in get_loop(range(n)):
 72        A = np.minimum(A, A[i, :][np.newaxis, :] + A[:, i][:, np.newaxis])
 73    return np.max(A, where=A != 0, initial=-np.inf)
 74
 75
 76def _continuous_diam_calculation(
 77    es: int, T: np.ndarray, max_value: float = None
 78) -> float:
 79    """
 80    Returns
 81    -------
 82    float
 83        The highest optimal expected number of time step to reach es among all other states in the continuous setting.
 84    """
 85    T_es = T.copy()
 86    T_es[es] = 0
 87    T_es[es, :, es] = 1
 88
 89    R_es = np.zeros(T.shape[:2], np.float32) - 1.0
 90    R_es[es] = 0
 91    res = discounted_value_iteration(T_es, R_es, 1, max_abs_value=max_value)
 92    if res is None:
 93        return None
 94    _, V = res
 95    return -V.min()
 96
 97
 98def _continuous_single_thread_diam(T: np.ndarray, max_value: float = None) -> float:
 99    diameter = 0
100    for es in get_loop(range(len(T))):
101        diameter = max(
102            diameter, _continuous_diam_calculation(es, T, max_value=max_value)
103        )
104        if max_value is not None and diameter > max_value:
105            return None
106    return diameter
107
108
109def _continuous_multi_thread_diam(T: np.ndarray, max_value: float = None) -> float:
110    loop = get_loop(range(len(T)))
111    diameter = 0
112    with Pool(processes=config.get_available_cores()) as p:
113        for d in p.starmap(
114            _continuous_diam_calculation, [[es, T, max_value] for es in range(len(T))]
115        ):
116            if max_value is not None and d is None:
117                diameter = None
118                break
119            diameter = max(diameter, d)
120            if config.VERBOSE_LEVEL > 0:
121                loop.set_description(f"Max diam: {diameter:.2f}")
122                loop.update()
123                loop.refresh()
124    return diameter
125
126
127def _episodic_diam_calculation(
128    es: int,
129    T: np.ndarray,
130    H: int,
131    reachable_states: Iterable["NODE_TYPE"],
132    max_value: float = None,
133) -> float:
134    """
135    returns the highest optimal expected number of time step to reach es among all other states in the episodic setting
136    """
137    T_es = T.copy()
138    T_es[es] = 0
139    T_es[es, :, es] = 1
140
141    R_es = np.zeros(T.shape[:2], np.float32) - 1.0
142    R_es[es] = 0
143    res = episodic_value_iteration(H, T_es, R_es, max_abs_value=max_value)
144    if res is None:
145        return None
146    _, V = res
147    return max(-V[h, s] for h, s in reachable_states)
148
149
150def _episodic_single_thread_diam(
151    H: int,
152    T: np.ndarray,
153    reachable_states: Iterable["NODE_TYPE"],
154    max_value: float = None,
155) -> float:
156    assert (
157        len(T.shape) == 3
158    ), "The transition matrix is incorrect. Please provide the non episodic transition matrix."
159    diam = 0
160    for es in get_loop(range(len(T))):
161        diam = max(
162            diam,
163            _episodic_diam_calculation(es, T, H, reachable_states, max_value=max_value),
164        )
165        if max_value is not None and diam > max_value:
166            return None
167    return diam
168
169
170def _episodic_multi_thread_diam(
171    H: int,
172    T: np.ndarray,
173    reachable_states: Iterable["NODE_TYPE"],
174    max_value: float = None,
175) -> float:
176    loop = get_loop(range(len(T)))
177    diam = 0
178    with Pool(processes=config.get_available_cores()) as p:
179        for d in p.starmap(
180            _episodic_diam_calculation,
181            [[es, T, H, reachable_states, max_value] for es in range(len(T))],
182        ):
183            diam = max(diam, d)
184            if max_value is not None and diam > max_value:
185                return None
186            if config.VERBOSE_LEVEL > 0:
187                loop.set_description(f"Max diam: {diam:.2f}")
188                loop.update()
189                loop.refresh()
190    return diam
191
192
193def _single_thread_episodic_diameter_calculation(
194    T: np.ndarray,
195    states: Iterable["NODE_TYPE"] = None,
196    epsilon=0.001,
197    max_value: float = None,
198) -> float:
199    if states is None:
200        n_states = T.shape[-1]
201        states = range(n_states)
202
203    diameter = -np.inf
204    states = list(reversed(states))
205
206    if config.VERBOSE_LEVEL != 0:
207        if type(config.VERBOSE_LEVEL) == int:
208            loop = tqdm(states, desc="Diameter calculation", mininterval=5)
209        else:
210            s = StringIO()
211            loop = tqdm(states, desc="Diameter calculation", file=s, mininterval=5)
212    else:
213        loop = states
214
215    dima_f = (
216        _continuous_diameter_calculation
217        if T.ndim == 3
218        else _episodic_diameter_calculation
219    )
220    for es in loop:
221        diameter = dima_f(es, T, diameter, epsilon, max_value)
222        if max_value is not None and diameter > max_value:
223            return None
224        if config.VERBOSE_LEVEL != 0 and type(config.VERBOSE_LEVEL) == str:
225            with open(config.VERBOSE_LEVEL, "a") as f:
226                f.write(
227                    s.getvalue()
228                    .split("\x1b")[0][2:]
229                    .replace("\x00", "")
230                    .replace("\n\r", "")
231                    + "\n"
232                )
233            s.truncate(0)
234    return diameter
235
236
237def _d_imap(args):
238    T, state_set, epsilon, sparse_T, max_value = args
239    if sparse_T:
240        n_states, num_sa = T.shape[-2:]
241        diam = 0
242        for es in state_set:
243            diam = _sparse_diameter_calculation(
244                es, T, int(num_sa / n_states), n_states, diam, epsilon, max_value
245            )
246        return diam
247    return _single_thread_episodic_diameter_calculation(
248        T, state_set, epsilon, max_value
249    )
250
251
252def _multi_thread_episodic_diameter_calculation(
253    T: np.ndarray,
254    epsilon: float = 0.001,
255    force_sparse: bool = False,
256    max_value: float = None,
257) -> float:
258    n_actions, n_states = T.shape[-2:]
259    diameter = -np.inf
260
261    sparse_T = False
262    if force_sparse or (n_states > 500 and T.ndim == 3):
263        T = np.moveaxis(T, -1, 0).reshape(n_states, n_states * n_actions)
264        T = sparse.COO(T)
265        sparse_T = True
266
267    states = np.arange(n_states)[::-1]
268    # np.random.shuffle(states)
269    states_sets = np.array_split(states, config.get_available_cores())
270    inputs = [(T, state_set, epsilon, sparse_T, max_value) for state_set in states_sets]
271    if config.VERBOSE_LEVEL > 0:
272        loop = trange(len(inputs), desc="Diameter calculation", mininterval=5)
273    with Pool(processes=config.get_available_cores()) as p:
274        for d in p.imap_unordered(_d_imap, inputs):
275            if max_value is not None and d is None:
276                diameter = None
277                break
278            diameter = max(diameter, d)
279            if config.VERBOSE_LEVEL > 0:
280                loop.update()
281                loop.set_description(f"Max diam: {diameter:.2f}")
282    return diameter
283
284
285@numba.njit()
286def _episodic_diameter_calculation(
287    es: int,
288    T: np.ndarray,
289    max_diam: float,
290    epsilon: float = 0.001,
291    max_value: float = None,
292) -> float:
293    H, n_states, n_actions, _ = T.shape
294    ETs = np.zeros((H, n_states), dtype=np.float32)
295    ET_minh = np.zeros((n_states,), dtype=np.float32)
296    for t in range(1_000_000):
297        ETs_old = ETs.copy()
298        ETs[-1] = T[-1, 0, 0] @ (1 + ETs[0])
299        for hh in range(1, H):
300            h = H - hh
301            for j in range(n_states):
302                if j != es:
303                    s = np.zeros(n_actions, np.float32)
304                    for ns in range(n_states):
305                        if ns != es:
306                            s += T[h - 1, j, :, ns] * (1 + ETs[h, ns])
307                    ETs[h - 1, j] = np.min(T[h - 1, j, :, es] + s)
308                    if max_value is not None and ETs[h - 1, j] > max_value:
309                        return None
310
311        diff = np.abs(ETs_old - ETs).max()
312        for ss in range(n_states):
313            ccc = ETs[:, ss]
314            ET_minh[ss] = np.min(ccc[ccc > 0])
315        cur_diam = ET_minh.max()
316        if diff < epsilon or (diff < 0.01 and cur_diam - 1 < max_diam):
317            break
318    return max(max_diam, cur_diam)
319
320
321@numba.njit()
322def _continuous_diameter_calculation(
323    es: "NODE_TYPE",
324    T: np.ndarray,
325    max_diam: float,
326    epsilon: float = 0.001,
327    max_value: float = None,
328) -> float:
329    n_states, n_actions, _ = T.shape
330    ETs = np.zeros(n_states, dtype=np.float32)
331    for t in range(1_000_000):
332        ETs_old = ETs.copy()
333        for j in range(n_states):
334            if j != es:
335                s = np.zeros(n_actions, np.float32)
336                for ns in range(n_states):
337                    if ns != es:
338                        s += T[j, :, ns] * (1 + ETs[ns])
339                ETs[j] = np.min(T[j, :, es] + s)
340                if max_value is not None and ETs[j].max() > max_value:
341                    return None
342
343        diff = np.abs(ETs_old - ETs).max()
344        if diff < epsilon or (diff < 0.05 and ETs.max() - 1 < max_diam):
345            break
346    return max(max_diam, ETs.max())
347
348
349def _sparse_diameter_calculation(
350    es: "NODE_TYPE",
351    T: np.ndarray,
352    n_actions: int,
353    n_states: int,
354    max_diam: float,
355    epsilon: float = 0.001,
356    max_value: float = None,
357) -> float:
358    ETs = np.zeros(n_states - 1, dtype=np.float32)
359
360    next_ets = (
361        lambda TT, ET: (TT * ET.reshape((-1, 1)))
362        .reshape((n_states - 1, n_states, n_actions))
363        .sum(0)
364        .todense()
365    )
366    selector = np.ones(n_states, dtype=bool)
367    selector[es] = False
368    Te = T[es].reshape((n_states, n_actions))
369    T_me = T[selector]
370
371    for j in range(1_000_000):
372        ETs_old = ETs.copy()
373        ETs = (Te + next_ets(T_me, 1 + ETs)).min(1)[selector]
374        diff = np.abs(ETs_old - ETs).max()
375        if diff < epsilon or (diff < 0.05 and ETs.max() - 1 < max_diam):
376            break
377        if max_value is not None and ETs.max() > max_value:
378            return None
379    return max(max_diam, ETs.max())
380
381
382def _get_sparse_diameter(
383    T: np.ndarray, epsilon: float = 0.001, max_value: float = None
384) -> float:
385    n_states, n_actions, _ = T.shape
386
387    T = np.moveaxis(T, -1, 0).reshape(n_states, n_states * n_actions)
388    T = sparse.COO(T)
389    next_ets = (
390        lambda TT, ET: (TT * ET.reshape(-1, 1))
391        .reshape((n_states - 1, n_states, n_actions))
392        .sum(0)
393        .todense()
394    )
395
396    diameter = -np.inf
397    loop = (
398        trange(n_states, desc="Calculating diameter", mininterval=5)
399        if config.VERBOSE_LEVEL > 0
400        else range(n_states)
401    )
402    selector = np.ones(n_states, dtype=bool)
403    for i in loop:
404        selector[i] = False
405        selector[i - 1] = True
406        Te = T[i].reshape((n_states, n_actions))
407        T_me = T[selector]
408
409        ETs = np.zeros(n_states - 1)
410        diff = np.inf
411        for j in range(1_000_000):
412            ETs_old = ETs.copy()
413            ETs = (Te + next_ets(T_me, 1 + ETs)).min(1)[selector]
414            diff = np.abs(ETs_old - ETs).max()
415            if diff < epsilon or (diff < 0.05 and ETs.max() - 1 < diameter):
416                break
417        diameter = max(diameter, ETs.max())
418        if max_value is not None and diameter > max_value:
419            return None
420    return diameter
def get_diameter(T: numpy.ndarray, is_episodic: bool, max_value: float = None) -> float:
21def get_diameter(T: np.ndarray, is_episodic: bool, max_value: float = None) -> float:
22    """
23    Returns
24    -------
25    float
26        The diameter for the transition matrix given in input. The is_episodic is only necessary to check whether
27    the dimensionality of the transition matrix is effectively correct. Note that, for the episodic setting, this
28    computes the diameter for the augmented state space.
29    """
30    assert (is_episodic and T.ndim == 4) or (not is_episodic and T.ndim == 3)
31    if is_episodic:
32        if config.get_available_cores() >= 3:
33            return _multi_thread_episodic_diameter_calculation(T, max_value=max_value)
34        return _single_thread_episodic_diameter_calculation(T, max_value=max_value)
35
36    if config.get_available_cores() >= 3:
37        return _continuous_multi_thread_diam(T, max_value=max_value)
38    if T.shape[-1] > 1000:
39        return _get_sparse_diameter(T, max_value=max_value)
40    return _continuous_single_thread_diam(T, max_value=max_value)
Returns
  • float: The diameter for the transition matrix given in input. The is_episodic is only necessary to check whether
  • the dimensionality of the transition matrix is effectively correct. Note that, for the episodic setting, this
  • computes the diameter for the augmented state space.
43def get_in_episodic_diameter(
44    H: int,
45    T: np.ndarray,
46    reachable_node: Iterable["NODE_TYPE"],
47    max_value: float = None,
48) -> float:
49    """
50    Returns
51    -------
52    float
53        The diameter for the in episodic formulation. Note that in this case, the diameter will always be less than
54    the time horizon.
55    """
56    if config.get_available_cores() >= 3:
57        return _episodic_multi_thread_diam(H, T, reachable_node, max_value)
58    return _episodic_single_thread_diam(H, T, reachable_node, max_value)
Returns
  • float: The diameter for the in episodic formulation. Note that in this case, the diameter will always be less than
  • the time horizon.
def get_diameter_for_determinsitic_MDPs(G: networkx.classes.digraph.DiGraph) -> float:
61def get_diameter_for_determinsitic_MDPs(G: nx.DiGraph) -> float:
62    """
63    Returns
64    -------
65    float
66        The diameter for the given graph that represents an MDP. Note that this can be considerably slower than
67    the dynamic programming implementation we propose.
68    """
69    A = nx.to_numpy_array(G, nonedge=np.inf)
70    n, m = A.shape
71    np.fill_diagonal(A, 0)  # diagonal elements should be zero
72    for i in get_loop(range(n)):
73        A = np.minimum(A, A[i, :][np.newaxis, :] + A[:, i][:, np.newaxis])
74    return np.max(A, where=A != 0, initial=-np.inf)
Returns
  • float: The diameter for the given graph that represents an MDP. Note that this can be considerably slower than
  • the dynamic programming implementation we propose.