colosseum.dynamic_programming.finite_horizon

 1from typing import Tuple
 2
 3import numba
 4import numpy as np
 5
 6from colosseum.dynamic_programming import DP_MAX_ITERATION
 7from colosseum.dynamic_programming.utils import DynamicProgrammingMaxIterationExceeded
 8from colosseum.dynamic_programming.utils import argmax_3d
 9
10
11@numba.njit()
12def episodic_value_iteration(
13    H: int, T: np.ndarray, R: np.ndarray, max_value: float = None
14) -> Tuple[np.ndarray, np.ndarray]:
15    n_states, n_actions, _ = T.shape
16
17    Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32)
18    V = np.zeros((H + 1, n_states), dtype=np.float32)
19    for i in range(H):
20        h = H - i - 1
21        for s in range(n_states):
22            Q[h, s] = R[s] + T[s] @ V[h + 1]
23            V[h, s] = Q[h, s].max()
24            if max_value is not None and V[h, s] > max_value:
25                return None
26    return Q, V
27
28
29@numba.njit()
30def episodic_policy_evaluation(
31    H: int, T: np.ndarray, R: np.ndarray, policy: np.ndarray
32) -> Tuple[np.ndarray, np.ndarray]:
33    n_states, n_actions, _ = T.shape
34
35    Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32)
36    V = np.zeros((H + 1, n_states), dtype=np.float32)
37    for i in range(H):
38        h = H - i - 1
39        for s in range(n_states):
40            Q[h, s] = R[s] + T[s] @ V[h + 1]
41            V[h, s] = (Q[h, s] * policy[h, s]).sum()
42    return Q, V
43
44
45def episodic_policy_iteration(T: np.ndarray, R: np.ndarray, gamma=0.99, epsilon=1e-7):
46    H, n_states, n_actions, _ = T.shape
47
48    Q = np.random.rand(H, n_states, n_actions)
49    pi = argmax_3d(Q)
50    for t in range(DP_MAX_ITERATION):
51        old_pi = pi.copy()
52        Q, V = episodic_policy_evaluation(T, R, pi, gamma, epsilon)
53        pi = argmax_3d(Q)
54        if (pi != old_pi).sum() == 0:
55            return Q, V, pi
56    raise DynamicProgrammingMaxIterationExceeded()
@numba.njit()
def episodic_value_iteration( H: int, T: numpy.ndarray, R: numpy.ndarray, max_value: float = None) -> Tuple[numpy.ndarray, numpy.ndarray]:
12@numba.njit()
13def episodic_value_iteration(
14    H: int, T: np.ndarray, R: np.ndarray, max_value: float = None
15) -> Tuple[np.ndarray, np.ndarray]:
16    n_states, n_actions, _ = T.shape
17
18    Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32)
19    V = np.zeros((H + 1, n_states), dtype=np.float32)
20    for i in range(H):
21        h = H - i - 1
22        for s in range(n_states):
23            Q[h, s] = R[s] + T[s] @ V[h + 1]
24            V[h, s] = Q[h, s].max()
25            if max_value is not None and V[h, s] > max_value:
26                return None
27    return Q, V
@numba.njit()
def episodic_policy_evaluation( H: int, T: numpy.ndarray, R: numpy.ndarray, policy: numpy.ndarray) -> Tuple[numpy.ndarray, numpy.ndarray]:
30@numba.njit()
31def episodic_policy_evaluation(
32    H: int, T: np.ndarray, R: np.ndarray, policy: np.ndarray
33) -> Tuple[np.ndarray, np.ndarray]:
34    n_states, n_actions, _ = T.shape
35
36    Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32)
37    V = np.zeros((H + 1, n_states), dtype=np.float32)
38    for i in range(H):
39        h = H - i - 1
40        for s in range(n_states):
41            Q[h, s] = R[s] + T[s] @ V[h + 1]
42            V[h, s] = (Q[h, s] * policy[h, s]).sum()
43    return Q, V
def episodic_policy_iteration(T: numpy.ndarray, R: numpy.ndarray, gamma=0.99, epsilon=1e-07):
46def episodic_policy_iteration(T: np.ndarray, R: np.ndarray, gamma=0.99, epsilon=1e-7):
47    H, n_states, n_actions, _ = T.shape
48
49    Q = np.random.rand(H, n_states, n_actions)
50    pi = argmax_3d(Q)
51    for t in range(DP_MAX_ITERATION):
52        old_pi = pi.copy()
53        Q, V = episodic_policy_evaluation(T, R, pi, gamma, epsilon)
54        pi = argmax_3d(Q)
55        if (pi != old_pi).sum() == 0:
56            return Q, V, pi
57    raise DynamicProgrammingMaxIterationExceeded()