colosseum.dynamic_programming.finite_horizon
1from typing import Tuple 2 3import numba 4import numpy as np 5 6from colosseum.dynamic_programming import DP_MAX_ITERATION 7from colosseum.dynamic_programming.utils import DynamicProgrammingMaxIterationExceeded 8from colosseum.dynamic_programming.utils import argmax_3d 9 10 11@numba.njit() 12def episodic_value_iteration( 13 H: int, T: np.ndarray, R: np.ndarray, max_value: float = None 14) -> Tuple[np.ndarray, np.ndarray]: 15 n_states, n_actions, _ = T.shape 16 17 Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32) 18 V = np.zeros((H + 1, n_states), dtype=np.float32) 19 for i in range(H): 20 h = H - i - 1 21 for s in range(n_states): 22 Q[h, s] = R[s] + T[s] @ V[h + 1] 23 V[h, s] = Q[h, s].max() 24 if max_value is not None and V[h, s] > max_value: 25 return None 26 return Q, V 27 28 29@numba.njit() 30def episodic_policy_evaluation( 31 H: int, T: np.ndarray, R: np.ndarray, policy: np.ndarray 32) -> Tuple[np.ndarray, np.ndarray]: 33 n_states, n_actions, _ = T.shape 34 35 Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32) 36 V = np.zeros((H + 1, n_states), dtype=np.float32) 37 for i in range(H): 38 h = H - i - 1 39 for s in range(n_states): 40 Q[h, s] = R[s] + T[s] @ V[h + 1] 41 V[h, s] = (Q[h, s] * policy[h, s]).sum() 42 return Q, V 43 44 45def episodic_policy_iteration(T: np.ndarray, R: np.ndarray, gamma=0.99, epsilon=1e-7): 46 H, n_states, n_actions, _ = T.shape 47 48 Q = np.random.rand(H, n_states, n_actions) 49 pi = argmax_3d(Q) 50 for t in range(DP_MAX_ITERATION): 51 old_pi = pi.copy() 52 Q, V = episodic_policy_evaluation(T, R, pi, gamma, epsilon) 53 pi = argmax_3d(Q) 54 if (pi != old_pi).sum() == 0: 55 return Q, V, pi 56 raise DynamicProgrammingMaxIterationExceeded()
@numba.njit()
def
episodic_value_iteration( H: int, T: numpy.ndarray, R: numpy.ndarray, max_value: float = None) -> Tuple[numpy.ndarray, numpy.ndarray]:
12@numba.njit() 13def episodic_value_iteration( 14 H: int, T: np.ndarray, R: np.ndarray, max_value: float = None 15) -> Tuple[np.ndarray, np.ndarray]: 16 n_states, n_actions, _ = T.shape 17 18 Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32) 19 V = np.zeros((H + 1, n_states), dtype=np.float32) 20 for i in range(H): 21 h = H - i - 1 22 for s in range(n_states): 23 Q[h, s] = R[s] + T[s] @ V[h + 1] 24 V[h, s] = Q[h, s].max() 25 if max_value is not None and V[h, s] > max_value: 26 return None 27 return Q, V
@numba.njit()
def
episodic_policy_evaluation( H: int, T: numpy.ndarray, R: numpy.ndarray, policy: numpy.ndarray) -> Tuple[numpy.ndarray, numpy.ndarray]:
30@numba.njit() 31def episodic_policy_evaluation( 32 H: int, T: np.ndarray, R: np.ndarray, policy: np.ndarray 33) -> Tuple[np.ndarray, np.ndarray]: 34 n_states, n_actions, _ = T.shape 35 36 Q = np.zeros((H + 1, n_states, n_actions), dtype=np.float32) 37 V = np.zeros((H + 1, n_states), dtype=np.float32) 38 for i in range(H): 39 h = H - i - 1 40 for s in range(n_states): 41 Q[h, s] = R[s] + T[s] @ V[h + 1] 42 V[h, s] = (Q[h, s] * policy[h, s]).sum() 43 return Q, V
def
episodic_policy_iteration(T: numpy.ndarray, R: numpy.ndarray, gamma=0.99, epsilon=1e-07):
46def episodic_policy_iteration(T: np.ndarray, R: np.ndarray, gamma=0.99, epsilon=1e-7): 47 H, n_states, n_actions, _ = T.shape 48 49 Q = np.random.rand(H, n_states, n_actions) 50 pi = argmax_3d(Q) 51 for t in range(DP_MAX_ITERATION): 52 old_pi = pi.copy() 53 Q, V = episodic_policy_evaluation(T, R, pi, gamma, epsilon) 54 pi = argmax_3d(Q) 55 if (pi != old_pi).sum() == 0: 56 return Q, V, pi 57 raise DynamicProgrammingMaxIterationExceeded()