colosseum.hardness.measures.diameter
1from io import StringIO 2from multiprocessing import Pool 3from typing import TYPE_CHECKING, Iterable 4 5import networkx as nx 6import numba 7import numpy as np 8import sparse 9from tqdm import tqdm, trange 10 11from colosseum import config 12from colosseum.dynamic_programming import discounted_value_iteration 13from colosseum.dynamic_programming import episodic_value_iteration 14from colosseum.utils import get_loop 15 16if TYPE_CHECKING: 17 from colosseum.mdp import NODE_TYPE 18 19 20def get_diameter(T: np.ndarray, is_episodic: bool, max_value: float = None) -> float: 21 """ 22 Returns 23 ------- 24 float 25 The diameter for the transition matrix given in input. The is_episodic is only necessary to check whether 26 the dimensionality of the transition matrix is effectively correct. Note that, for the episodic setting, this 27 computes the diameter for the augmented state space. 28 """ 29 assert (is_episodic and T.ndim == 4) or (not is_episodic and T.ndim == 3) 30 if is_episodic: 31 if config.get_available_cores() >= 3: 32 return _multi_thread_episodic_diameter_calculation(T, max_value=max_value) 33 return _single_thread_episodic_diameter_calculation(T, max_value=max_value) 34 35 if config.get_available_cores() >= 3: 36 return _continuous_multi_thread_diam(T, max_value=max_value) 37 if T.shape[-1] > 1000: 38 return _get_sparse_diameter(T, max_value=max_value) 39 return _continuous_single_thread_diam(T, max_value=max_value) 40 41 42def get_in_episodic_diameter( 43 H: int, 44 T: np.ndarray, 45 reachable_node: Iterable["NODE_TYPE"], 46 max_value: float = None, 47) -> float: 48 """ 49 Returns 50 ------- 51 float 52 The diameter for the in episodic formulation. Note that in this case, the diameter will always be less than 53 the time horizon. 54 """ 55 if config.get_available_cores() >= 3: 56 return _episodic_multi_thread_diam(H, T, reachable_node, max_value) 57 return _episodic_single_thread_diam(H, T, reachable_node, max_value) 58 59 60def get_diameter_for_determinsitic_MDPs(G: nx.DiGraph) -> float: 61 """ 62 Returns 63 ------- 64 float 65 The diameter for the given graph that represents an MDP. Note that this can be considerably slower than 66 the dynamic programming implementation we propose. 67 """ 68 A = nx.to_numpy_array(G, nonedge=np.inf) 69 n, m = A.shape 70 np.fill_diagonal(A, 0) # diagonal elements should be zero 71 for i in get_loop(range(n)): 72 A = np.minimum(A, A[i, :][np.newaxis, :] + A[:, i][:, np.newaxis]) 73 return np.max(A, where=A != 0, initial=-np.inf) 74 75 76def _continuous_diam_calculation( 77 es: int, T: np.ndarray, max_value: float = None 78) -> float: 79 """ 80 Returns 81 ------- 82 float 83 The highest optimal expected number of time step to reach es among all other states in the continuous setting. 84 """ 85 T_es = T.copy() 86 T_es[es] = 0 87 T_es[es, :, es] = 1 88 89 R_es = np.zeros(T.shape[:2], np.float32) - 1.0 90 R_es[es] = 0 91 res = discounted_value_iteration(T_es, R_es, 1, max_abs_value=max_value) 92 if res is None: 93 return None 94 _, V = res 95 return -V.min() 96 97 98def _continuous_single_thread_diam(T: np.ndarray, max_value: float = None) -> float: 99 diameter = 0 100 for es in get_loop(range(len(T))): 101 diameter = max( 102 diameter, _continuous_diam_calculation(es, T, max_value=max_value) 103 ) 104 if max_value is not None and diameter > max_value: 105 return None 106 return diameter 107 108 109def _continuous_multi_thread_diam(T: np.ndarray, max_value: float = None) -> float: 110 loop = get_loop(range(len(T))) 111 diameter = 0 112 with Pool(processes=config.get_available_cores()) as p: 113 for d in p.starmap( 114 _continuous_diam_calculation, [[es, T, max_value] for es in range(len(T))] 115 ): 116 if max_value is not None and d is None: 117 diameter = None 118 break 119 diameter = max(diameter, d) 120 if config.VERBOSE_LEVEL > 0: 121 loop.set_description(f"Max diam: {diameter:.2f}") 122 loop.update() 123 loop.refresh() 124 return diameter 125 126 127def _episodic_diam_calculation( 128 es: int, 129 T: np.ndarray, 130 H: int, 131 reachable_states: Iterable["NODE_TYPE"], 132 max_value: float = None, 133) -> float: 134 """ 135 returns the highest optimal expected number of time step to reach es among all other states in the episodic setting 136 """ 137 T_es = T.copy() 138 T_es[es] = 0 139 T_es[es, :, es] = 1 140 141 R_es = np.zeros(T.shape[:2], np.float32) - 1.0 142 R_es[es] = 0 143 res = episodic_value_iteration(H, T_es, R_es, max_abs_value=max_value) 144 if res is None: 145 return None 146 _, V = res 147 return max(-V[h, s] for h, s in reachable_states) 148 149 150def _episodic_single_thread_diam( 151 H: int, 152 T: np.ndarray, 153 reachable_states: Iterable["NODE_TYPE"], 154 max_value: float = None, 155) -> float: 156 assert ( 157 len(T.shape) == 3 158 ), "The transition matrix is incorrect. Please provide the non episodic transition matrix." 159 diam = 0 160 for es in get_loop(range(len(T))): 161 diam = max( 162 diam, 163 _episodic_diam_calculation(es, T, H, reachable_states, max_value=max_value), 164 ) 165 if max_value is not None and diam > max_value: 166 return None 167 return diam 168 169 170def _episodic_multi_thread_diam( 171 H: int, 172 T: np.ndarray, 173 reachable_states: Iterable["NODE_TYPE"], 174 max_value: float = None, 175) -> float: 176 loop = get_loop(range(len(T))) 177 diam = 0 178 with Pool(processes=config.get_available_cores()) as p: 179 for d in p.starmap( 180 _episodic_diam_calculation, 181 [[es, T, H, reachable_states, max_value] for es in range(len(T))], 182 ): 183 diam = max(diam, d) 184 if max_value is not None and diam > max_value: 185 return None 186 if config.VERBOSE_LEVEL > 0: 187 loop.set_description(f"Max diam: {diam:.2f}") 188 loop.update() 189 loop.refresh() 190 return diam 191 192 193def _single_thread_episodic_diameter_calculation( 194 T: np.ndarray, 195 states: Iterable["NODE_TYPE"] = None, 196 epsilon=0.001, 197 max_value: float = None, 198) -> float: 199 if states is None: 200 n_states = T.shape[-1] 201 states = range(n_states) 202 203 diameter = -np.inf 204 states = list(reversed(states)) 205 206 if config.VERBOSE_LEVEL != 0: 207 if type(config.VERBOSE_LEVEL) == int: 208 loop = tqdm(states, desc="Diameter calculation", mininterval=5) 209 else: 210 s = StringIO() 211 loop = tqdm(states, desc="Diameter calculation", file=s, mininterval=5) 212 else: 213 loop = states 214 215 dima_f = ( 216 _continuous_diameter_calculation 217 if T.ndim == 3 218 else _episodic_diameter_calculation 219 ) 220 for es in loop: 221 diameter = dima_f(es, T, diameter, epsilon, max_value) 222 if max_value is not None and diameter > max_value: 223 return None 224 if config.VERBOSE_LEVEL != 0 and type(config.VERBOSE_LEVEL) == str: 225 with open(config.VERBOSE_LEVEL, "a") as f: 226 f.write( 227 s.getvalue() 228 .split("\x1b")[0][2:] 229 .replace("\x00", "") 230 .replace("\n\r", "") 231 + "\n" 232 ) 233 s.truncate(0) 234 return diameter 235 236 237def _d_imap(args): 238 T, state_set, epsilon, sparse_T, max_value = args 239 if sparse_T: 240 n_states, num_sa = T.shape[-2:] 241 diam = 0 242 for es in state_set: 243 diam = _sparse_diameter_calculation( 244 es, T, int(num_sa / n_states), n_states, diam, epsilon, max_value 245 ) 246 return diam 247 return _single_thread_episodic_diameter_calculation( 248 T, state_set, epsilon, max_value 249 ) 250 251 252def _multi_thread_episodic_diameter_calculation( 253 T: np.ndarray, 254 epsilon: float = 0.001, 255 force_sparse: bool = False, 256 max_value: float = None, 257) -> float: 258 n_actions, n_states = T.shape[-2:] 259 diameter = -np.inf 260 261 sparse_T = False 262 if force_sparse or (n_states > 500 and T.ndim == 3): 263 T = np.moveaxis(T, -1, 0).reshape(n_states, n_states * n_actions) 264 T = sparse.COO(T) 265 sparse_T = True 266 267 states = np.arange(n_states)[::-1] 268 # np.random.shuffle(states) 269 states_sets = np.array_split(states, config.get_available_cores()) 270 inputs = [(T, state_set, epsilon, sparse_T, max_value) for state_set in states_sets] 271 if config.VERBOSE_LEVEL > 0: 272 loop = trange(len(inputs), desc="Diameter calculation", mininterval=5) 273 with Pool(processes=config.get_available_cores()) as p: 274 for d in p.imap_unordered(_d_imap, inputs): 275 if max_value is not None and d is None: 276 diameter = None 277 break 278 diameter = max(diameter, d) 279 if config.VERBOSE_LEVEL > 0: 280 loop.update() 281 loop.set_description(f"Max diam: {diameter:.2f}") 282 return diameter 283 284 285@numba.njit() 286def _episodic_diameter_calculation( 287 es: int, 288 T: np.ndarray, 289 max_diam: float, 290 epsilon: float = 0.001, 291 max_value: float = None, 292) -> float: 293 H, n_states, n_actions, _ = T.shape 294 ETs = np.zeros((H, n_states), dtype=np.float32) 295 ET_minh = np.zeros((n_states,), dtype=np.float32) 296 for t in range(1_000_000): 297 ETs_old = ETs.copy() 298 ETs[-1] = T[-1, 0, 0] @ (1 + ETs[0]) 299 for hh in range(1, H): 300 h = H - hh 301 for j in range(n_states): 302 if j != es: 303 s = np.zeros(n_actions, np.float32) 304 for ns in range(n_states): 305 if ns != es: 306 s += T[h - 1, j, :, ns] * (1 + ETs[h, ns]) 307 ETs[h - 1, j] = np.min(T[h - 1, j, :, es] + s) 308 if max_value is not None and ETs[h - 1, j] > max_value: 309 return None 310 311 diff = np.abs(ETs_old - ETs).max() 312 for ss in range(n_states): 313 ccc = ETs[:, ss] 314 ET_minh[ss] = np.min(ccc[ccc > 0]) 315 cur_diam = ET_minh.max() 316 if diff < epsilon or (diff < 0.01 and cur_diam - 1 < max_diam): 317 break 318 return max(max_diam, cur_diam) 319 320 321@numba.njit() 322def _continuous_diameter_calculation( 323 es: "NODE_TYPE", 324 T: np.ndarray, 325 max_diam: float, 326 epsilon: float = 0.001, 327 max_value: float = None, 328) -> float: 329 n_states, n_actions, _ = T.shape 330 ETs = np.zeros(n_states, dtype=np.float32) 331 for t in range(1_000_000): 332 ETs_old = ETs.copy() 333 for j in range(n_states): 334 if j != es: 335 s = np.zeros(n_actions, np.float32) 336 for ns in range(n_states): 337 if ns != es: 338 s += T[j, :, ns] * (1 + ETs[ns]) 339 ETs[j] = np.min(T[j, :, es] + s) 340 if max_value is not None and ETs[j].max() > max_value: 341 return None 342 343 diff = np.abs(ETs_old - ETs).max() 344 if diff < epsilon or (diff < 0.05 and ETs.max() - 1 < max_diam): 345 break 346 return max(max_diam, ETs.max()) 347 348 349def _sparse_diameter_calculation( 350 es: "NODE_TYPE", 351 T: np.ndarray, 352 n_actions: int, 353 n_states: int, 354 max_diam: float, 355 epsilon: float = 0.001, 356 max_value: float = None, 357) -> float: 358 ETs = np.zeros(n_states - 1, dtype=np.float32) 359 360 next_ets = ( 361 lambda TT, ET: (TT * ET.reshape((-1, 1))) 362 .reshape((n_states - 1, n_states, n_actions)) 363 .sum(0) 364 .todense() 365 ) 366 selector = np.ones(n_states, dtype=bool) 367 selector[es] = False 368 Te = T[es].reshape((n_states, n_actions)) 369 T_me = T[selector] 370 371 for j in range(1_000_000): 372 ETs_old = ETs.copy() 373 ETs = (Te + next_ets(T_me, 1 + ETs)).min(1)[selector] 374 diff = np.abs(ETs_old - ETs).max() 375 if diff < epsilon or (diff < 0.05 and ETs.max() - 1 < max_diam): 376 break 377 if max_value is not None and ETs.max() > max_value: 378 return None 379 return max(max_diam, ETs.max()) 380 381 382def _get_sparse_diameter( 383 T: np.ndarray, epsilon: float = 0.001, max_value: float = None 384) -> float: 385 n_states, n_actions, _ = T.shape 386 387 T = np.moveaxis(T, -1, 0).reshape(n_states, n_states * n_actions) 388 T = sparse.COO(T) 389 next_ets = ( 390 lambda TT, ET: (TT * ET.reshape(-1, 1)) 391 .reshape((n_states - 1, n_states, n_actions)) 392 .sum(0) 393 .todense() 394 ) 395 396 diameter = -np.inf 397 loop = ( 398 trange(n_states, desc="Calculating diameter", mininterval=5) 399 if config.VERBOSE_LEVEL > 0 400 else range(n_states) 401 ) 402 selector = np.ones(n_states, dtype=bool) 403 for i in loop: 404 selector[i] = False 405 selector[i - 1] = True 406 Te = T[i].reshape((n_states, n_actions)) 407 T_me = T[selector] 408 409 ETs = np.zeros(n_states - 1) 410 diff = np.inf 411 for j in range(1_000_000): 412 ETs_old = ETs.copy() 413 ETs = (Te + next_ets(T_me, 1 + ETs)).min(1)[selector] 414 diff = np.abs(ETs_old - ETs).max() 415 if diff < epsilon or (diff < 0.05 and ETs.max() - 1 < diameter): 416 break 417 diameter = max(diameter, ETs.max()) 418 if max_value is not None and diameter > max_value: 419 return None 420 return diameter
def
get_diameter(T: numpy.ndarray, is_episodic: bool, max_value: float = None) -> float:
21def get_diameter(T: np.ndarray, is_episodic: bool, max_value: float = None) -> float: 22 """ 23 Returns 24 ------- 25 float 26 The diameter for the transition matrix given in input. The is_episodic is only necessary to check whether 27 the dimensionality of the transition matrix is effectively correct. Note that, for the episodic setting, this 28 computes the diameter for the augmented state space. 29 """ 30 assert (is_episodic and T.ndim == 4) or (not is_episodic and T.ndim == 3) 31 if is_episodic: 32 if config.get_available_cores() >= 3: 33 return _multi_thread_episodic_diameter_calculation(T, max_value=max_value) 34 return _single_thread_episodic_diameter_calculation(T, max_value=max_value) 35 36 if config.get_available_cores() >= 3: 37 return _continuous_multi_thread_diam(T, max_value=max_value) 38 if T.shape[-1] > 1000: 39 return _get_sparse_diameter(T, max_value=max_value) 40 return _continuous_single_thread_diam(T, max_value=max_value)
Returns
- float: The diameter for the transition matrix given in input. The is_episodic is only necessary to check whether
- the dimensionality of the transition matrix is effectively correct. Note that, for the episodic setting, this
- computes the diameter for the augmented state space.
def
get_in_episodic_diameter( H: int, T: numpy.ndarray, reachable_node: Iterable[Union[colosseum.mdp.custom_mdp.CustomNode, colosseum.mdp.river_swim.base.RiverSwimNode, colosseum.mdp.deep_sea.base.DeepSeaNode, colosseum.mdp.frozen_lake.base.FrozenLakeNode, colosseum.mdp.simple_grid.base.SimpleGridNode, colosseum.mdp.minigrid_empty.base.MiniGridEmptyNode, colosseum.mdp.minigrid_rooms.base.MiniGridRoomsNode, colosseum.mdp.taxi.base.TaxiNode]], max_value: float = None) -> float:
43def get_in_episodic_diameter( 44 H: int, 45 T: np.ndarray, 46 reachable_node: Iterable["NODE_TYPE"], 47 max_value: float = None, 48) -> float: 49 """ 50 Returns 51 ------- 52 float 53 The diameter for the in episodic formulation. Note that in this case, the diameter will always be less than 54 the time horizon. 55 """ 56 if config.get_available_cores() >= 3: 57 return _episodic_multi_thread_diam(H, T, reachable_node, max_value) 58 return _episodic_single_thread_diam(H, T, reachable_node, max_value)
Returns
- float: The diameter for the in episodic formulation. Note that in this case, the diameter will always be less than
- the time horizon.
def
get_diameter_for_determinsitic_MDPs(G: networkx.classes.digraph.DiGraph) -> float:
61def get_diameter_for_determinsitic_MDPs(G: nx.DiGraph) -> float: 62 """ 63 Returns 64 ------- 65 float 66 The diameter for the given graph that represents an MDP. Note that this can be considerably slower than 67 the dynamic programming implementation we propose. 68 """ 69 A = nx.to_numpy_array(G, nonedge=np.inf) 70 n, m = A.shape 71 np.fill_diagonal(A, 0) # diagonal elements should be zero 72 for i in get_loop(range(n)): 73 A = np.minimum(A, A[i, :][np.newaxis, :] + A[:, i][:, np.newaxis]) 74 return np.max(A, where=A != 0, initial=-np.inf)
Returns
- float: The diameter for the given graph that represents an MDP. Note that this can be considerably slower than
- the dynamic programming implementation we propose.