colosseum.mdp.utils.markov_chain
1import os 2from typing import Iterable, List, Optional, Tuple 3 4import networkx as nx 5import numba 6import numpy as np 7import scipy 8from pydtmc import MarkovChain 9from scipy.sparse import coo_matrix, csr_matrix 10 11 12def get_average_reward( 13 T: np.ndarray, 14 R: np.ndarray, 15 policy: np.ndarray, 16 next_states_and_probs: Optional, 17 sparse_threshold_size: int = 500 * 500, 18) -> float: 19 """ 20 Returns 21 ------- 22 float 23 The expected average reward when following policy for the MDP defined by the given transition matrix and 24 rewards matrix. 25 """ 26 assert np.isclose(policy.sum(-1), 1).all(), "the policy specification is incorrect." 27 28 average_rewards = get_average_rewards(R, policy) 29 tps = get_transition_probabilities(T, policy) 30 sd = get_stationary_distribution(tps, next_states_and_probs, sparse_threshold_size) 31 return (average_rewards * sd).sum() 32 33 34def get_average_rewards(R: np.ndarray, policy: np.ndarray) -> np.ndarray: 35 """ 36 Returns 37 ------- 38 np.ndarray 39 The expected rewards for each state when following the given policy. 40 """ 41 return np.einsum("sa,sa->s", R, policy) 42 43 44def get_transition_probabilities(T: np.ndarray, policy: np.ndarray) -> np.ndarray: 45 """ 46 Returns 47 ------- 48 np.ndarray 49 The transition probability matrix of the Markov chain yielded by the given policy. 50 """ 51 return np.minimum(1.0, np.einsum("saj,sa->sj", T, policy)) 52 53 54def get_markov_chain(transition_probabilities: np.ndarray) -> MarkovChain: 55 """ 56 Returns 57 ------- 58 MarkovChain 59 The Markov chain object from the pydtmc package. 60 """ 61 return MarkovChain(transition_probabilities) 62 63 64def get_stationary_distribution( 65 tps: np.ndarray, 66 starting_states_and_probs: Iterable[Tuple[int, float]], 67 sparse_threshold_size: int = 500 * 500, 68) -> np.ndarray: 69 """ 70 returns the stationary distribution of the transition matrix. If there are multiple recurrent classes, and so 71 multiple stationary distribution, the return stationary distribution is the average of the stationary distributions 72 weighted using the starting state distribution. 73 74 Parameters 75 ---------- 76 tps : np.ndarray 77 The transition probabilities matrix. 78 starting_states_and_probs : List[Tuple[int, float]] 79 The iterable over the starting states and their corresponding probabilities. 80 sparse_threshold_size : int 81 The threshold for the size of the transition probabilities matrix that flags whether it is better to use sparse 82 matrices. 83 84 Returns 85 ------- 86 np.ndarray 87 The stationary distribution of the transition matrix. 88 """ 89 if tps.size > sparse_threshold_size: 90 G = nx.DiGraph(coo_matrix(tps)) 91 else: 92 G = nx.DiGraph(tps) 93 94 # Obtain the recurrent classes 95 recurrent_classes = list(map(tuple, nx.attracting_components(G))) 96 if len(recurrent_classes) == 1 and len(recurrent_classes[0]) < len(tps): 97 sd = np.zeros(len(tps), np.float32) 98 if len(recurrent_classes[0]) == 1: 99 sd[recurrent_classes[0][0]] = 1 100 else: 101 sd[list(recurrent_classes[0])] = _get_stationary_distribution( 102 tps[np.ix_(recurrent_classes[0], recurrent_classes[0])], 103 sparse_threshold_size, 104 ) 105 return sd 106 107 elif len(recurrent_classes) > 1 and len(recurrent_classes[0]) < len(tps): 108 109 sd = np.zeros(len(tps)) 110 if len(recurrent_classes) > 1: 111 # Weight the stationary distribution of the recurrent classes with starting states distribution 112 for ss, p in starting_states_and_probs: 113 for recurrent_class in recurrent_classes: 114 try: 115 # this means that the starting state ss is connected to recurrent_class 116 nx.shortest_path_length(G, ss, recurrent_class[0]) 117 118 # Weighting the stationary distribution with the probability of the starting state 119 sd[list(recurrent_class)] += p * _get_stationary_distribution( 120 tps[np.ix_(recurrent_class, recurrent_class)], 121 sparse_threshold_size, 122 ) 123 break 124 except nx.exception.NetworkXNoPath: 125 pass 126 else: 127 # No need to weight with the starting state distribution since there is only one recurrent class 128 sd[list(recurrent_classes[0])] += _get_stationary_distribution( 129 tps[np.ix_(recurrent_classes[0], recurrent_classes[0])], 130 sparse_threshold_size, 131 ) 132 133 return sd 134 135 sd = _get_stationary_distribution(tps) 136 return sd 137 138 139@numba.njit() 140def _gth_solve_numba(tps: np.ndarray) -> np.ndarray: 141 """ 142 returns the stationary distribution of a transition probabilities matrix with a single recurrent class using the 143 GTH method. 144 """ 145 a = np.copy(tps).astype(np.float64) 146 n = a.shape[0] 147 148 for i in range(n - 1): 149 scale = np.sum(a[i, i + 1 : n]) 150 151 if scale <= 0.0: # pragma: no cover 152 n = i + 1 153 break 154 155 a[i + 1 : n, i] /= scale 156 a[i + 1 : n, i + 1 : n] += np.outer( 157 a[i + 1 : n, i : i + 1], a[i : i + 1, i + 1 : n] 158 ) 159 160 x = np.zeros(n, np.float64) 161 x[n - 1] = 1.0 162 x[n - 2] = a[n - 1, n - 2] 163 for i in range(n - 3, -1, -1): 164 x[i] = np.sum(x[i + 1 : n] * a[i + 1 : n, i]) 165 x /= np.sum(x) 166 return x 167 168 169def _convertToRateMatrix(tps: csr_matrix): 170 """ 171 converts the initial matrix to a rate matrix. We make all rows in Q sum to zero by subtracting the row sums from the 172 diagonal. 173 """ 174 rowSums = tps.sum(axis=1).getA1() 175 idxRange = np.arange(tps.shape[0]) 176 Qdiag = coo_matrix((rowSums, (idxRange, idxRange)), shape=tps.shape).tocsr() 177 return tps - Qdiag 178 179 180def _eigen_method(tps, tol=1e-8, maxiter=1e5): 181 """ 182 returns the stationary distribution of a transition probabilities matrix with a single recurrent class using the 183 eigenvalue method. 184 """ 185 Q = _convertToRateMatrix(tps) 186 size = Q.shape[0] 187 guess = np.ones(size, dtype=float) 188 w, v = scipy.sparse.linalg.eigs( 189 Q.T, k=1, v0=guess, sigma=1e-6, which="LM", tol=tol, maxiter=maxiter 190 ) 191 pi = v[:, 0].real 192 pi /= pi.sum() 193 return np.maximum(pi, 0.0) 194 195 196def _get_stationary_distribution( 197 tps: np.ndarray, sparse_threshold_size: int = 500 * 500 198) -> np.ndarray: 199 os.makedirs("tmp", exist_ok=True) 200 201 if len(tps) == 1: 202 return np.ones(1, np.float32) 203 204 if tps.size > sparse_threshold_size: 205 sd = _eigen_method(csr_matrix(tps)) 206 if np.isnan(sd).any() or not np.isclose(sd.sum(), 1.0): 207 # sometimes the eigen method fails so we use gth that is slower but more reliable 208 os.makedirs("tmp/sd_failures", exist_ok=True) 209 for i in range(1000): 210 if not os.path.isfile(f"tmp/sd_failures/tps{i}.npy"): 211 np.save(f"tmp/sd_failures/tps{i}.npy", tps) 212 break 213 214 sd = _gth_solve_numba(tps) 215 if not np.isclose(sd.sum(), 1.0) and np.isclose(sd.sum(), 1, rtol=4): 216 sd /= sd.sum() 217 assert not (np.isnan(sd).any() or not np.isclose(sd.sum(), 1.0)), np.save( 218 "tmp/tps.npy", tps 219 ) 220 return sd 221 222 sd = _gth_solve_numba(tps) 223 if np.isnan(sd).any() or not np.isclose(sd.sum(), 1.0): 224 np.save("tmp/tps.npy", tps) 225 226 tps = tps / tps.sum(1, keepdims=True) 227 sd = _eigen_method(csr_matrix(tps)) 228 if not np.isclose(sd.sum(), 1.0) and np.isclose(sd.sum(), 1, rtol=4): 229 sd /= sd.sum() 230 assert not (np.isnan(sd).any() or not np.isclose(sd.sum(), 1.0)), np.save( 231 "tmp/tps.npy", tps 232 ) 233 return sd
def
get_average_reward( T: numpy.ndarray, R: numpy.ndarray, policy: numpy.ndarray, next_states_and_probs: Optional, sparse_threshold_size: int = 250000) -> float:
13def get_average_reward( 14 T: np.ndarray, 15 R: np.ndarray, 16 policy: np.ndarray, 17 next_states_and_probs: Optional, 18 sparse_threshold_size: int = 500 * 500, 19) -> float: 20 """ 21 Returns 22 ------- 23 float 24 The expected average reward when following policy for the MDP defined by the given transition matrix and 25 rewards matrix. 26 """ 27 assert np.isclose(policy.sum(-1), 1).all(), "the policy specification is incorrect." 28 29 average_rewards = get_average_rewards(R, policy) 30 tps = get_transition_probabilities(T, policy) 31 sd = get_stationary_distribution(tps, next_states_and_probs, sparse_threshold_size) 32 return (average_rewards * sd).sum()
Returns
- float: The expected average reward when following policy for the MDP defined by the given transition matrix and
- rewards matrix.
def
get_average_rewards(R: numpy.ndarray, policy: numpy.ndarray) -> numpy.ndarray:
35def get_average_rewards(R: np.ndarray, policy: np.ndarray) -> np.ndarray: 36 """ 37 Returns 38 ------- 39 np.ndarray 40 The expected rewards for each state when following the given policy. 41 """ 42 return np.einsum("sa,sa->s", R, policy)
Returns
- np.ndarray: The expected rewards for each state when following the given policy.
def
get_transition_probabilities(T: numpy.ndarray, policy: numpy.ndarray) -> numpy.ndarray:
45def get_transition_probabilities(T: np.ndarray, policy: np.ndarray) -> np.ndarray: 46 """ 47 Returns 48 ------- 49 np.ndarray 50 The transition probability matrix of the Markov chain yielded by the given policy. 51 """ 52 return np.minimum(1.0, np.einsum("saj,sa->sj", T, policy))
Returns
- np.ndarray: The transition probability matrix of the Markov chain yielded by the given policy.
def
get_markov_chain( transition_probabilities: numpy.ndarray) -> pydtmc.markov_chain.MarkovChain:
55def get_markov_chain(transition_probabilities: np.ndarray) -> MarkovChain: 56 """ 57 Returns 58 ------- 59 MarkovChain 60 The Markov chain object from the pydtmc package. 61 """ 62 return MarkovChain(transition_probabilities)
Returns
- MarkovChain: The Markov chain object from the pydtmc package.
def
get_stationary_distribution( tps: numpy.ndarray, starting_states_and_probs: Iterable[Tuple[int, float]], sparse_threshold_size: int = 250000) -> numpy.ndarray:
65def get_stationary_distribution( 66 tps: np.ndarray, 67 starting_states_and_probs: Iterable[Tuple[int, float]], 68 sparse_threshold_size: int = 500 * 500, 69) -> np.ndarray: 70 """ 71 returns the stationary distribution of the transition matrix. If there are multiple recurrent classes, and so 72 multiple stationary distribution, the return stationary distribution is the average of the stationary distributions 73 weighted using the starting state distribution. 74 75 Parameters 76 ---------- 77 tps : np.ndarray 78 The transition probabilities matrix. 79 starting_states_and_probs : List[Tuple[int, float]] 80 The iterable over the starting states and their corresponding probabilities. 81 sparse_threshold_size : int 82 The threshold for the size of the transition probabilities matrix that flags whether it is better to use sparse 83 matrices. 84 85 Returns 86 ------- 87 np.ndarray 88 The stationary distribution of the transition matrix. 89 """ 90 if tps.size > sparse_threshold_size: 91 G = nx.DiGraph(coo_matrix(tps)) 92 else: 93 G = nx.DiGraph(tps) 94 95 # Obtain the recurrent classes 96 recurrent_classes = list(map(tuple, nx.attracting_components(G))) 97 if len(recurrent_classes) == 1 and len(recurrent_classes[0]) < len(tps): 98 sd = np.zeros(len(tps), np.float32) 99 if len(recurrent_classes[0]) == 1: 100 sd[recurrent_classes[0][0]] = 1 101 else: 102 sd[list(recurrent_classes[0])] = _get_stationary_distribution( 103 tps[np.ix_(recurrent_classes[0], recurrent_classes[0])], 104 sparse_threshold_size, 105 ) 106 return sd 107 108 elif len(recurrent_classes) > 1 and len(recurrent_classes[0]) < len(tps): 109 110 sd = np.zeros(len(tps)) 111 if len(recurrent_classes) > 1: 112 # Weight the stationary distribution of the recurrent classes with starting states distribution 113 for ss, p in starting_states_and_probs: 114 for recurrent_class in recurrent_classes: 115 try: 116 # this means that the starting state ss is connected to recurrent_class 117 nx.shortest_path_length(G, ss, recurrent_class[0]) 118 119 # Weighting the stationary distribution with the probability of the starting state 120 sd[list(recurrent_class)] += p * _get_stationary_distribution( 121 tps[np.ix_(recurrent_class, recurrent_class)], 122 sparse_threshold_size, 123 ) 124 break 125 except nx.exception.NetworkXNoPath: 126 pass 127 else: 128 # No need to weight with the starting state distribution since there is only one recurrent class 129 sd[list(recurrent_classes[0])] += _get_stationary_distribution( 130 tps[np.ix_(recurrent_classes[0], recurrent_classes[0])], 131 sparse_threshold_size, 132 ) 133 134 return sd 135 136 sd = _get_stationary_distribution(tps) 137 return sd
returns the stationary distribution of the transition matrix. If there are multiple recurrent classes, and so multiple stationary distribution, the return stationary distribution is the average of the stationary distributions weighted using the starting state distribution.
Parameters
- tps (np.ndarray): The transition probabilities matrix.
- starting_states_and_probs (List[Tuple[int, float]]): The iterable over the starting states and their corresponding probabilities.
- sparse_threshold_size (int): The threshold for the size of the transition probabilities matrix that flags whether it is better to use sparse matrices.
Returns
- np.ndarray: The stationary distribution of the transition matrix.