colosseum.hardness.measures.value_norm

 1import time
 2
 3import numpy as np
 4from tqdm import trange
 5
 6from colosseum import config
 7from colosseum.dynamic_programming import discounted_value_iteration
 8from colosseum.dynamic_programming.infinite_horizon import discounted_policy_evaluation
 9from colosseum.dynamic_programming.utils import get_policy_from_q_values
10from colosseum.mdp.utils.markov_chain import get_average_rewards
11from colosseum.mdp.utils.markov_chain import get_transition_probabilities
12
13
14def get_value_norm(
15    T: np.ndarray,
16    R: np.ndarray,
17    discount: bool,
18    policy: np.ndarray = None,
19) -> float:
20    """
21    computes the environmental value norm.
22
23    Parameters
24    ----------
25    T : np.ndarray
26        is the transition matrix.
27    R : np.ndarray
28        is the reward matrix.
29    discount : bool
30        checks whether to compute the environmental value norm in the discounted or undiscounted form.
31    policy : np.ndarray, optional
32        is the policy for which it computes the environmental value norm. By default, it uses the optimal policy.
33
34    Returns
35    -------
36    float
37        The environmental value norm value.
38    """
39
40    if discount:
41        Q, V = (
42            discounted_value_iteration(T, R)
43            if policy is None
44            else discounted_policy_evaluation(T, R, policy)
45        )
46        return calculate_norm_discounted(T, V)
47
48    if policy is None:
49        policy = get_policy_from_q_values(discounted_value_iteration(T, R)[0], True)
50    tps = get_transition_probabilities(T, policy)
51    ars = get_average_rewards(R, policy)
52    return calculate_norm_average(T, tps, ars)
53
54
55def _expected_value(f, ni):
56    if np.isclose(ni, 0).mean() > 0.9:
57        import sparse
58
59        ni_sparse = sparse.COO(ni)
60        return ni_sparse @ f
61    return np.einsum("iaj,j->ia", ni, f)
62
63
64def _calculate_gain(tps, average_rewards, steps):
65    P_star = np.linalg.matrix_power(tps, steps)
66    return P_star @ average_rewards
67
68
69def _calculate_bias(tps, average_rewards, steps=1000):
70    n_states = len(tps)
71
72    gain = _calculate_gain(tps, average_rewards, steps)
73
74    h = np.zeros((n_states,))
75    P_i = np.eye(n_states)
76    start = time.time()
77    for i in trange(steps, desc="gain") if config.VERBOSE_LEVEL > 0 else range(steps):
78        h += P_i @ (average_rewards - gain)
79        P_i = P_i @ tps
80        if time.time() - start > 60:
81            break
82    return h
83
84
85def calculate_norm_discounted(T, V):
86    Ev = _expected_value(V, T)
87    return np.sqrt(np.einsum("iaj,ja->ia", T, (V.reshape(-1, 1) - Ev) ** 2)).max()
88
89
90def calculate_norm_average(T, tps, average_rewards, steps=1000):
91    h = _calculate_bias(tps, average_rewards, steps)
92    Eh = _expected_value(h, T)
93    return np.sqrt(np.einsum("iaj,ja->ia", T, (h.reshape(-1, 1) - Eh) ** 2)).max()
def get_value_norm( T: numpy.ndarray, R: numpy.ndarray, discount: bool, policy: numpy.ndarray = None) -> float:
15def get_value_norm(
16    T: np.ndarray,
17    R: np.ndarray,
18    discount: bool,
19    policy: np.ndarray = None,
20) -> float:
21    """
22    computes the environmental value norm.
23
24    Parameters
25    ----------
26    T : np.ndarray
27        is the transition matrix.
28    R : np.ndarray
29        is the reward matrix.
30    discount : bool
31        checks whether to compute the environmental value norm in the discounted or undiscounted form.
32    policy : np.ndarray, optional
33        is the policy for which it computes the environmental value norm. By default, it uses the optimal policy.
34
35    Returns
36    -------
37    float
38        The environmental value norm value.
39    """
40
41    if discount:
42        Q, V = (
43            discounted_value_iteration(T, R)
44            if policy is None
45            else discounted_policy_evaluation(T, R, policy)
46        )
47        return calculate_norm_discounted(T, V)
48
49    if policy is None:
50        policy = get_policy_from_q_values(discounted_value_iteration(T, R)[0], True)
51    tps = get_transition_probabilities(T, policy)
52    ars = get_average_rewards(R, policy)
53    return calculate_norm_average(T, tps, ars)

computes the environmental value norm.

Parameters
  • T (np.ndarray): is the transition matrix.
  • R (np.ndarray): is the reward matrix.
  • discount (bool): checks whether to compute the environmental value norm in the discounted or undiscounted form.
  • policy (np.ndarray, optional): is the policy for which it computes the environmental value norm. By default, it uses the optimal policy.
Returns
  • float: The environmental value norm value.
def calculate_norm_discounted(T, V):
86def calculate_norm_discounted(T, V):
87    Ev = _expected_value(V, T)
88    return np.sqrt(np.einsum("iaj,ja->ia", T, (V.reshape(-1, 1) - Ev) ** 2)).max()
def calculate_norm_average(T, tps, average_rewards, steps=1000):
91def calculate_norm_average(T, tps, average_rewards, steps=1000):
92    h = _calculate_bias(tps, average_rewards, steps)
93    Eh = _expected_value(h, T)
94    return np.sqrt(np.einsum("iaj,ja->ia", T, (h.reshape(-1, 1) - Eh) ** 2)).max()