colosseum.agent.agents.infinite_horizon.ucrl2
1import math 2import os 3from typing import TYPE_CHECKING, Any, Callable, Dict, Union 4 5import dm_env 6import gin 7import numpy as np 8from ray import tune 9 10from colosseum.agent.actors import QValuesActor 11from colosseum.agent.agents.base import BaseAgent 12from colosseum.dynamic_programming import discounted_value_iteration 13from colosseum.dynamic_programming.infinite_horizon import extended_value_iteration 14from colosseum.dynamic_programming.utils import get_policy_from_q_values 15from colosseum.emission_maps import EmissionMap 16 17if TYPE_CHECKING: 18 from colosseum.mdp import ACTION_TYPE 19 from colosseum.utils.acme.specs import MDPSpec 20 21 22def _chernoff(it, N, delta, sqrt_C, log_C, range=1.0): 23 ci = range * np.sqrt(sqrt_C * math.log(log_C * (it + 1) / delta) / np.maximum(1, N)) 24 return ci 25 26 27def bernstein(scale_a, log_scale_a, scale_b, log_scale_b, alpha_1, alpha_2): 28 A = scale_a * math.log(log_scale_a) 29 B = scale_b * math.log(log_scale_b) 30 return alpha_1 * np.sqrt(A) + alpha_2 * B 31 32 33@gin.configurable 34class UCRL2Continuous(BaseAgent): 35 """ 36 The second version of upper confidence for reinforcement learning algorithm. 37 38 Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in 39 neural information processing systems 21 (2008). 40 41 Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality." 42 arXiv preprint arXiv:2007.05456 (2020). 43 """ 44 45 @staticmethod 46 def is_emission_map_accepted(emission_map: "EmissionMap") -> bool: 47 return emission_map.is_tabular 48 49 @staticmethod 50 def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0): 51 string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n" 52 for k, v in parameters.items(): 53 string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n" 54 return string[:-1] 55 56 @staticmethod 57 def is_episodic() -> bool: 58 return False 59 60 @staticmethod 61 def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]: 62 return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)} 63 64 @staticmethod 65 def get_agent_instance_from_parameters( 66 seed: int, 67 optimization_horizon: int, 68 mdp_specs: "MDPSpec", 69 parameters: Dict[str, Any], 70 ) -> "BaseAgent": 71 return UCRL2Continuous( 72 mdp_specs=mdp_specs, 73 seed=seed, 74 optimization_horizon=optimization_horizon, 75 alpha_p=parameters["alpha_p"], 76 alpha_r=parameters["alpha_r"], 77 bound_type_p="bernstein", 78 ) 79 80 @property 81 def current_optimal_stochastic_policy(self) -> np.ndarray: 82 Q, _ = discounted_value_iteration(self.P, self.estimated_rewards) 83 return get_policy_from_q_values(Q, True) 84 85 def __init__( 86 self, 87 seed: int, 88 mdp_specs: "MDPSpec", 89 optimization_horizon: int, 90 # MDP model parameters 91 alpha_r=1.0, 92 alpha_p=1.0, 93 bound_type_p="_chernoff", 94 bound_type_rew="_chernoff", 95 # Actor parameters 96 epsilon_greedy: Union[float, Callable] = None, 97 boltzmann_temperature: Union[float, Callable] = None, 98 ): 99 r""" 100 Parameters 101 ---------- 102 seed : int 103 The random seed. 104 mdp_specs : MDPSpec 105 The full specification of the MDP. 106 optimization_horizon : int 107 The total number of interactions that the agent is expected to have with the MDP. 108 alpha_r : float 109 The :math:`\alpha` parameter for the rewards. By default, it is set to one. 110 alpha_p : float 111 The :math:`\alpha` parameter for the transitions. By default, it is set to one. 112 bound_type_p : str 113 The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default, 114 it is set to '_chernoff'. 115 bound_type_rew : str 116 The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default, 117 it is set to '_chernoff'. 118 epsilon_greedy : Union[float, Callable], optional 119 The probability of selecting an action at random. It can be provided as a float or as a function of the 120 total number of interactions. By default, the probability is set to zero. 121 boltzmann_temperature : Union[float, Callable], optional 122 The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of 123 the total number of interactions. By default, Boltzmann exploration is disabled. 124 """ 125 126 n_states = self._n_states = mdp_specs.observations.num_values 127 n_actions = self._n_actions = mdp_specs.actions.num_values 128 self.reward_range = mdp_specs.rewards_range 129 130 assert bound_type_p in ["_chernoff", "bernstein"] 131 assert bound_type_rew in ["_chernoff", "bernstein"] 132 133 self.alpha_p = alpha_p 134 self.alpha_r = alpha_r 135 136 # initialize matrices 137 self.policy = np.zeros((n_states,), dtype=np.int_) 138 self.policy_indices = np.zeros((n_states,), dtype=np.int_) 139 140 # initialization 141 self.iteration = 0 142 self.episode = 0 143 self.delta = 1.0 # confidence 144 self.bound_type_p = bound_type_p 145 self.bound_type_rew = bound_type_rew 146 147 self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states 148 149 self.estimated_rewards = ( 150 np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1] 151 ) 152 self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32) 153 self.estimated_holding_times = np.ones((n_states, n_actions), np.float32) 154 self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32) 155 156 self.current_state = None 157 self.artificial_episode = 0 158 self.episode_reward_data = dict() 159 self.episode_transition_data = dict() 160 161 super(UCRL2Continuous, self).__init__( 162 seed, 163 mdp_specs, 164 None, 165 QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature), 166 optimization_horizon, 167 ) 168 169 def is_episode_end( 170 self, 171 ts_t: dm_env.TimeStep, 172 a_t: "ACTION_TYPE", 173 ts_tp1: dm_env.TimeStep, 174 time: int, 175 ) -> bool: 176 nu_k = len(self.episode_transition_data[ts_t.observation, a_t]) 177 return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k) 178 179 def episode_end_update(self): 180 self.episode += 1 181 self.delta = 1 / math.sqrt(self.iteration + 1) 182 183 new_sp = self.solve_optimistic_model() 184 if new_sp is not None: 185 self.span_value = new_sp / self.reward_range[1] 186 187 if len(self.episode_transition_data) > 0: 188 self.model_update() 189 self.episode_reward_data = dict() 190 self.episode_transition_data = dict() 191 192 def before_start_interacting(self): 193 self.episode_end_update() 194 195 def step_update( 196 self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int 197 ): 198 self.N[ts_t.observation, a_t, ts_tp1.observation] += 1 199 200 if (ts_t.observation, a_t) in self.episode_reward_data: 201 self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward) 202 if not ts_tp1.last(): 203 self.episode_transition_data[ts_t.observation, a_t].append( 204 ts_tp1.observation 205 ) 206 else: 207 self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward] 208 if not ts_tp1.last(): 209 self.episode_transition_data[ts_t.observation, a_t] = [ 210 ts_tp1.observation 211 ] 212 213 def model_update(self): 214 """ 215 updates the model given the transitions obtained during the artificial episode. 216 """ 217 for (s_tm1, action), r_ts in self.episode_reward_data.items(): 218 # updated observations 219 scale_f = self.N[s_tm1, action].sum() 220 for r in r_ts: 221 # update the number of total iterations 222 self.iteration += 1 223 224 # update reward and variance estimate 225 scale_f += 1 226 old_estimated_reward = self.estimated_rewards[s_tm1, action] 227 self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0) 228 self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0) 229 self.variance_proxy_reward[s_tm1, action] += ( 230 r - old_estimated_reward 231 ) * (r - self.estimated_rewards[s_tm1, action]) 232 233 # update holding time 234 self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0) 235 self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1) 236 237 for (s_tm1, action) in set(self.episode_transition_data.keys()): 238 self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum() 239 240 def beta_r(self, nb_observations) -> np.ndarray: 241 """ 242 calculates the confidence bounds on the reward. 243 Returns 244 ------- 245 np.array 246 The vector of confidence bounds on the reward function (|S| x |A|) 247 """ 248 S = self._n_states 249 A = self._n_actions 250 if self.bound_type_rew != "bernstein": 251 ci = _chernoff( 252 it=self.iteration, 253 N=nb_observations, 254 range=self.reward_range[1], 255 delta=self.delta, 256 sqrt_C=3.5, 257 log_C=2 * S * A, 258 ) 259 return self.alpha_r * ci 260 else: 261 N = np.maximum(1, nb_observations) 262 Nm1 = np.maximum(1, nb_observations - 1) 263 var_r = self.variance_proxy_reward / Nm1 264 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 265 beta = bernstein( 266 scale_a=14 * var_r / N, 267 log_scale_a=log_value, 268 scale_b=49.0 * self.r_max / (3.0 * Nm1), 269 log_scale_b=log_value, 270 alpha_1=math.sqrt(self.alpha_r), 271 alpha_2=self.alpha_r, 272 ) 273 return beta 274 275 def beta_p(self, nb_observations) -> np.ndarray: 276 """ 277 calculates the confidence bounds on the transition probabilities. 278 Returns 279 ------- 280 np.array 281 The vector of confidence bounds on the reward function (|S| x |A|) 282 """ 283 S = self._n_states 284 A = self._n_actions 285 if self.bound_type_p != "bernstein": 286 beta = _chernoff( 287 it=self.iteration, 288 N=nb_observations, 289 range=1.0, 290 delta=self.delta, 291 sqrt_C=14 * S, 292 log_C=2 * A, 293 ) 294 return self.alpha_p * beta.reshape([S, A, 1]) 295 else: 296 N = np.maximum(1, nb_observations) 297 Nm1 = np.maximum(1, nb_observations - 1) 298 var_p = self.P * (1.0 - self.P) 299 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 300 beta = bernstein( 301 scale_a=14 * var_p / N[:, :, np.newaxis], 302 log_scale_a=log_value, 303 scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]), 304 log_scale_b=log_value, 305 alpha_1=math.sqrt(self.alpha_p), 306 alpha_2=self.alpha_p, 307 ) 308 return beta 309 310 def solve_optimistic_model(self) -> Union[None, float]: 311 """ 312 solves the optimistic value iteration. 313 Returns 314 ------- 315 Union[None, float] 316 The span value of the estimates from the optimistic value iteration or None if no solution has been found. 317 """ 318 nb_observations = self.N.sum(-1) 319 320 beta_r = self.beta_r(nb_observations) # confidence bounds on rewards 321 beta_p = self.beta_p( 322 nb_observations 323 ) # confidence bounds on transition probabilities 324 325 T = self.P 326 estimated_rewards = self.estimated_rewards 327 328 assert np.isclose(T.sum(-1), 1.0).all() 329 try: 330 res = extended_value_iteration( 331 T, estimated_rewards, beta_r, beta_p, self.reward_range[1] 332 ) 333 except SystemError: 334 # Debug logs if the optimistic value iteration fails 335 os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True) 336 for i in range(100): 337 if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"): 338 np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T) 339 np.save( 340 f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy", 341 estimated_rewards, 342 ) 343 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r) 344 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p) 345 break 346 res = None 347 348 if res is not None: 349 span_value_new, self.Q, self.V = res 350 span_value = span_value_new 351 self._actor.set_q_values(self.Q) 352 353 assert span_value >= 0, "The span value cannot be lower than zero" 354 assert np.abs(span_value - span_value_new) < 1e-8 355 356 return span_value 357 return None
34@gin.configurable 35class UCRL2Continuous(BaseAgent): 36 """ 37 The second version of upper confidence for reinforcement learning algorithm. 38 39 Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in 40 neural information processing systems 21 (2008). 41 42 Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality." 43 arXiv preprint arXiv:2007.05456 (2020). 44 """ 45 46 @staticmethod 47 def is_emission_map_accepted(emission_map: "EmissionMap") -> bool: 48 return emission_map.is_tabular 49 50 @staticmethod 51 def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0): 52 string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n" 53 for k, v in parameters.items(): 54 string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n" 55 return string[:-1] 56 57 @staticmethod 58 def is_episodic() -> bool: 59 return False 60 61 @staticmethod 62 def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]: 63 return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)} 64 65 @staticmethod 66 def get_agent_instance_from_parameters( 67 seed: int, 68 optimization_horizon: int, 69 mdp_specs: "MDPSpec", 70 parameters: Dict[str, Any], 71 ) -> "BaseAgent": 72 return UCRL2Continuous( 73 mdp_specs=mdp_specs, 74 seed=seed, 75 optimization_horizon=optimization_horizon, 76 alpha_p=parameters["alpha_p"], 77 alpha_r=parameters["alpha_r"], 78 bound_type_p="bernstein", 79 ) 80 81 @property 82 def current_optimal_stochastic_policy(self) -> np.ndarray: 83 Q, _ = discounted_value_iteration(self.P, self.estimated_rewards) 84 return get_policy_from_q_values(Q, True) 85 86 def __init__( 87 self, 88 seed: int, 89 mdp_specs: "MDPSpec", 90 optimization_horizon: int, 91 # MDP model parameters 92 alpha_r=1.0, 93 alpha_p=1.0, 94 bound_type_p="_chernoff", 95 bound_type_rew="_chernoff", 96 # Actor parameters 97 epsilon_greedy: Union[float, Callable] = None, 98 boltzmann_temperature: Union[float, Callable] = None, 99 ): 100 r""" 101 Parameters 102 ---------- 103 seed : int 104 The random seed. 105 mdp_specs : MDPSpec 106 The full specification of the MDP. 107 optimization_horizon : int 108 The total number of interactions that the agent is expected to have with the MDP. 109 alpha_r : float 110 The :math:`\alpha` parameter for the rewards. By default, it is set to one. 111 alpha_p : float 112 The :math:`\alpha` parameter for the transitions. By default, it is set to one. 113 bound_type_p : str 114 The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default, 115 it is set to '_chernoff'. 116 bound_type_rew : str 117 The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default, 118 it is set to '_chernoff'. 119 epsilon_greedy : Union[float, Callable], optional 120 The probability of selecting an action at random. It can be provided as a float or as a function of the 121 total number of interactions. By default, the probability is set to zero. 122 boltzmann_temperature : Union[float, Callable], optional 123 The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of 124 the total number of interactions. By default, Boltzmann exploration is disabled. 125 """ 126 127 n_states = self._n_states = mdp_specs.observations.num_values 128 n_actions = self._n_actions = mdp_specs.actions.num_values 129 self.reward_range = mdp_specs.rewards_range 130 131 assert bound_type_p in ["_chernoff", "bernstein"] 132 assert bound_type_rew in ["_chernoff", "bernstein"] 133 134 self.alpha_p = alpha_p 135 self.alpha_r = alpha_r 136 137 # initialize matrices 138 self.policy = np.zeros((n_states,), dtype=np.int_) 139 self.policy_indices = np.zeros((n_states,), dtype=np.int_) 140 141 # initialization 142 self.iteration = 0 143 self.episode = 0 144 self.delta = 1.0 # confidence 145 self.bound_type_p = bound_type_p 146 self.bound_type_rew = bound_type_rew 147 148 self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states 149 150 self.estimated_rewards = ( 151 np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1] 152 ) 153 self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32) 154 self.estimated_holding_times = np.ones((n_states, n_actions), np.float32) 155 self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32) 156 157 self.current_state = None 158 self.artificial_episode = 0 159 self.episode_reward_data = dict() 160 self.episode_transition_data = dict() 161 162 super(UCRL2Continuous, self).__init__( 163 seed, 164 mdp_specs, 165 None, 166 QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature), 167 optimization_horizon, 168 ) 169 170 def is_episode_end( 171 self, 172 ts_t: dm_env.TimeStep, 173 a_t: "ACTION_TYPE", 174 ts_tp1: dm_env.TimeStep, 175 time: int, 176 ) -> bool: 177 nu_k = len(self.episode_transition_data[ts_t.observation, a_t]) 178 return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k) 179 180 def episode_end_update(self): 181 self.episode += 1 182 self.delta = 1 / math.sqrt(self.iteration + 1) 183 184 new_sp = self.solve_optimistic_model() 185 if new_sp is not None: 186 self.span_value = new_sp / self.reward_range[1] 187 188 if len(self.episode_transition_data) > 0: 189 self.model_update() 190 self.episode_reward_data = dict() 191 self.episode_transition_data = dict() 192 193 def before_start_interacting(self): 194 self.episode_end_update() 195 196 def step_update( 197 self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int 198 ): 199 self.N[ts_t.observation, a_t, ts_tp1.observation] += 1 200 201 if (ts_t.observation, a_t) in self.episode_reward_data: 202 self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward) 203 if not ts_tp1.last(): 204 self.episode_transition_data[ts_t.observation, a_t].append( 205 ts_tp1.observation 206 ) 207 else: 208 self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward] 209 if not ts_tp1.last(): 210 self.episode_transition_data[ts_t.observation, a_t] = [ 211 ts_tp1.observation 212 ] 213 214 def model_update(self): 215 """ 216 updates the model given the transitions obtained during the artificial episode. 217 """ 218 for (s_tm1, action), r_ts in self.episode_reward_data.items(): 219 # updated observations 220 scale_f = self.N[s_tm1, action].sum() 221 for r in r_ts: 222 # update the number of total iterations 223 self.iteration += 1 224 225 # update reward and variance estimate 226 scale_f += 1 227 old_estimated_reward = self.estimated_rewards[s_tm1, action] 228 self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0) 229 self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0) 230 self.variance_proxy_reward[s_tm1, action] += ( 231 r - old_estimated_reward 232 ) * (r - self.estimated_rewards[s_tm1, action]) 233 234 # update holding time 235 self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0) 236 self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1) 237 238 for (s_tm1, action) in set(self.episode_transition_data.keys()): 239 self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum() 240 241 def beta_r(self, nb_observations) -> np.ndarray: 242 """ 243 calculates the confidence bounds on the reward. 244 Returns 245 ------- 246 np.array 247 The vector of confidence bounds on the reward function (|S| x |A|) 248 """ 249 S = self._n_states 250 A = self._n_actions 251 if self.bound_type_rew != "bernstein": 252 ci = _chernoff( 253 it=self.iteration, 254 N=nb_observations, 255 range=self.reward_range[1], 256 delta=self.delta, 257 sqrt_C=3.5, 258 log_C=2 * S * A, 259 ) 260 return self.alpha_r * ci 261 else: 262 N = np.maximum(1, nb_observations) 263 Nm1 = np.maximum(1, nb_observations - 1) 264 var_r = self.variance_proxy_reward / Nm1 265 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 266 beta = bernstein( 267 scale_a=14 * var_r / N, 268 log_scale_a=log_value, 269 scale_b=49.0 * self.r_max / (3.0 * Nm1), 270 log_scale_b=log_value, 271 alpha_1=math.sqrt(self.alpha_r), 272 alpha_2=self.alpha_r, 273 ) 274 return beta 275 276 def beta_p(self, nb_observations) -> np.ndarray: 277 """ 278 calculates the confidence bounds on the transition probabilities. 279 Returns 280 ------- 281 np.array 282 The vector of confidence bounds on the reward function (|S| x |A|) 283 """ 284 S = self._n_states 285 A = self._n_actions 286 if self.bound_type_p != "bernstein": 287 beta = _chernoff( 288 it=self.iteration, 289 N=nb_observations, 290 range=1.0, 291 delta=self.delta, 292 sqrt_C=14 * S, 293 log_C=2 * A, 294 ) 295 return self.alpha_p * beta.reshape([S, A, 1]) 296 else: 297 N = np.maximum(1, nb_observations) 298 Nm1 = np.maximum(1, nb_observations - 1) 299 var_p = self.P * (1.0 - self.P) 300 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 301 beta = bernstein( 302 scale_a=14 * var_p / N[:, :, np.newaxis], 303 log_scale_a=log_value, 304 scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]), 305 log_scale_b=log_value, 306 alpha_1=math.sqrt(self.alpha_p), 307 alpha_2=self.alpha_p, 308 ) 309 return beta 310 311 def solve_optimistic_model(self) -> Union[None, float]: 312 """ 313 solves the optimistic value iteration. 314 Returns 315 ------- 316 Union[None, float] 317 The span value of the estimates from the optimistic value iteration or None if no solution has been found. 318 """ 319 nb_observations = self.N.sum(-1) 320 321 beta_r = self.beta_r(nb_observations) # confidence bounds on rewards 322 beta_p = self.beta_p( 323 nb_observations 324 ) # confidence bounds on transition probabilities 325 326 T = self.P 327 estimated_rewards = self.estimated_rewards 328 329 assert np.isclose(T.sum(-1), 1.0).all() 330 try: 331 res = extended_value_iteration( 332 T, estimated_rewards, beta_r, beta_p, self.reward_range[1] 333 ) 334 except SystemError: 335 # Debug logs if the optimistic value iteration fails 336 os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True) 337 for i in range(100): 338 if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"): 339 np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T) 340 np.save( 341 f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy", 342 estimated_rewards, 343 ) 344 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r) 345 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p) 346 break 347 res = None 348 349 if res is not None: 350 span_value_new, self.Q, self.V = res 351 span_value = span_value_new 352 self._actor.set_q_values(self.Q) 353 354 assert span_value >= 0, "The span value cannot be lower than zero" 355 assert np.abs(span_value - span_value_new) < 1e-8 356 357 return span_value 358 return None
The second version of upper confidence for reinforcement learning algorithm.
Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in neural information processing systems 21 (2008).
Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality." arXiv preprint arXiv:2007.05456 (2020).
86 def __init__( 87 self, 88 seed: int, 89 mdp_specs: "MDPSpec", 90 optimization_horizon: int, 91 # MDP model parameters 92 alpha_r=1.0, 93 alpha_p=1.0, 94 bound_type_p="_chernoff", 95 bound_type_rew="_chernoff", 96 # Actor parameters 97 epsilon_greedy: Union[float, Callable] = None, 98 boltzmann_temperature: Union[float, Callable] = None, 99 ): 100 r""" 101 Parameters 102 ---------- 103 seed : int 104 The random seed. 105 mdp_specs : MDPSpec 106 The full specification of the MDP. 107 optimization_horizon : int 108 The total number of interactions that the agent is expected to have with the MDP. 109 alpha_r : float 110 The :math:`\alpha` parameter for the rewards. By default, it is set to one. 111 alpha_p : float 112 The :math:`\alpha` parameter for the transitions. By default, it is set to one. 113 bound_type_p : str 114 The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default, 115 it is set to '_chernoff'. 116 bound_type_rew : str 117 The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default, 118 it is set to '_chernoff'. 119 epsilon_greedy : Union[float, Callable], optional 120 The probability of selecting an action at random. It can be provided as a float or as a function of the 121 total number of interactions. By default, the probability is set to zero. 122 boltzmann_temperature : Union[float, Callable], optional 123 The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of 124 the total number of interactions. By default, Boltzmann exploration is disabled. 125 """ 126 127 n_states = self._n_states = mdp_specs.observations.num_values 128 n_actions = self._n_actions = mdp_specs.actions.num_values 129 self.reward_range = mdp_specs.rewards_range 130 131 assert bound_type_p in ["_chernoff", "bernstein"] 132 assert bound_type_rew in ["_chernoff", "bernstein"] 133 134 self.alpha_p = alpha_p 135 self.alpha_r = alpha_r 136 137 # initialize matrices 138 self.policy = np.zeros((n_states,), dtype=np.int_) 139 self.policy_indices = np.zeros((n_states,), dtype=np.int_) 140 141 # initialization 142 self.iteration = 0 143 self.episode = 0 144 self.delta = 1.0 # confidence 145 self.bound_type_p = bound_type_p 146 self.bound_type_rew = bound_type_rew 147 148 self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states 149 150 self.estimated_rewards = ( 151 np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1] 152 ) 153 self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32) 154 self.estimated_holding_times = np.ones((n_states, n_actions), np.float32) 155 self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32) 156 157 self.current_state = None 158 self.artificial_episode = 0 159 self.episode_reward_data = dict() 160 self.episode_transition_data = dict() 161 162 super(UCRL2Continuous, self).__init__( 163 seed, 164 mdp_specs, 165 None, 166 QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature), 167 optimization_horizon, 168 )
Parameters
- seed (int): The random seed.
- mdp_specs (MDPSpec): The full specification of the MDP.
- optimization_horizon (int): The total number of interactions that the agent is expected to have with the MDP.
- alpha_r (float): The \( \alpha \) parameter for the rewards. By default, it is set to one.
- alpha_p (float): The \( \alpha \) parameter for the transitions. By default, it is set to one.
- bound_type_p (str): The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default, it is set to '_chernoff'.
- bound_type_rew (str): The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default, it is set to '_chernoff'.
- epsilon_greedy (Union[float, Callable], optional): The probability of selecting an action at random. It can be provided as a float or as a function of the total number of interactions. By default, the probability is set to zero.
- boltzmann_temperature (Union[float, Callable], optional): The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of the total number of interactions. By default, Boltzmann exploration is disabled.
46 @staticmethod 47 def is_emission_map_accepted(emission_map: "EmissionMap") -> bool: 48 return emission_map.is_tabular
Returns
- bool: True if the agent class accepts the emission map.
50 @staticmethod 51 def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0): 52 string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n" 53 for k, v in parameters.items(): 54 string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n" 55 return string[:-1]
produces a string containing the gin config file corresponding to the parameters given in input.
Parameters
- parameters (Dict[str, Any]): The dictionary containing the parameters of the agent.
- index (int): The index assigned to the gin configuration.
Returns
- gin_config (str): The gin configuration file.
Returns
- bool: True if the agent is suited for the episodic setting.
61 @staticmethod 62 def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]: 63 return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)}
Returns
- Dict[str, tune.sample.Domain]: The dictionary with key value pairs corresponding to hyperparameter names and corresponding
ray.tune
samplers.
65 @staticmethod 66 def get_agent_instance_from_parameters( 67 seed: int, 68 optimization_horizon: int, 69 mdp_specs: "MDPSpec", 70 parameters: Dict[str, Any], 71 ) -> "BaseAgent": 72 return UCRL2Continuous( 73 mdp_specs=mdp_specs, 74 seed=seed, 75 optimization_horizon=optimization_horizon, 76 alpha_p=parameters["alpha_p"], 77 alpha_r=parameters["alpha_r"], 78 bound_type_p="bernstein", 79 )
returns an agent instance for the mdp specification and agent parameters given in input.
Parameters
- seed (int): The random seed.
- optimization_horizon (int): The total number of interactions that the agent is expected to have with the MDP.
- mdp_specs (MDPSpec): The full specification of the MDP.
- parameters (Dict[str, Any]): The dictionary containing the parameters of the agent.
Returns
- BaseAgent: The agent instance.
Returns
- np.ndarray: The estimates of the best optimal policy given the current knowledge of the agent in the form of distribution over actions.
170 def is_episode_end( 171 self, 172 ts_t: dm_env.TimeStep, 173 a_t: "ACTION_TYPE", 174 ts_tp1: dm_env.TimeStep, 175 time: int, 176 ) -> bool: 177 nu_k = len(self.episode_transition_data[ts_t.observation, a_t]) 178 return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k)
checks whether the episode is terminated. By default, this checks whether the current time step exceeds the time horizon. In the continuous case, this can be used to define artificial episodes.
Parameters
- ts_t (dm_env.TimeStep): The TimeStep at time t.
- a_t ("ACTION_TYPE"): The action taken by the agent at time t.
- ts_tp1 (dm_env.TimeStep): The TimeStep at time t + 1.
- time (int): The current time of the environment. In the episodic case, this refers to the in-episode time, whereas in the continuous case this refers to the total number of previous interactions.
Returns
- bool: True if the episode terminated at time t+1.
180 def episode_end_update(self): 181 self.episode += 1 182 self.delta = 1 / math.sqrt(self.iteration + 1) 183 184 new_sp = self.solve_optimistic_model() 185 if new_sp is not None: 186 self.span_value = new_sp / self.reward_range[1] 187 188 if len(self.episode_transition_data) > 0: 189 self.model_update() 190 self.episode_reward_data = dict() 191 self.episode_transition_data = dict()
is called when an episode ends. In the infinite horizon case, we refer to artificial episodes.
196 def step_update( 197 self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int 198 ): 199 self.N[ts_t.observation, a_t, ts_tp1.observation] += 1 200 201 if (ts_t.observation, a_t) in self.episode_reward_data: 202 self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward) 203 if not ts_tp1.last(): 204 self.episode_transition_data[ts_t.observation, a_t].append( 205 ts_tp1.observation 206 ) 207 else: 208 self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward] 209 if not ts_tp1.last(): 210 self.episode_transition_data[ts_t.observation, a_t] = [ 211 ts_tp1.observation 212 ]
adds the transition in input to the MDP model.
Parameters
- ts_t (dm_env.TimeStep): The TimeStep at time t.
- a_t ("ACTION_TYPE"): The action taken by the agent at time t.
- ts_tp1 (dm_env.TimeStep): The TimeStep at time t + 1.
- time (int): The current time of the environment. In the episodic case, this refers to the in-episode time, whereas in the continuous case this refers to the total number of previous interactions.
214 def model_update(self): 215 """ 216 updates the model given the transitions obtained during the artificial episode. 217 """ 218 for (s_tm1, action), r_ts in self.episode_reward_data.items(): 219 # updated observations 220 scale_f = self.N[s_tm1, action].sum() 221 for r in r_ts: 222 # update the number of total iterations 223 self.iteration += 1 224 225 # update reward and variance estimate 226 scale_f += 1 227 old_estimated_reward = self.estimated_rewards[s_tm1, action] 228 self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0) 229 self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0) 230 self.variance_proxy_reward[s_tm1, action] += ( 231 r - old_estimated_reward 232 ) * (r - self.estimated_rewards[s_tm1, action]) 233 234 # update holding time 235 self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0) 236 self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1) 237 238 for (s_tm1, action) in set(self.episode_transition_data.keys()): 239 self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum()
updates the model given the transitions obtained during the artificial episode.
241 def beta_r(self, nb_observations) -> np.ndarray: 242 """ 243 calculates the confidence bounds on the reward. 244 Returns 245 ------- 246 np.array 247 The vector of confidence bounds on the reward function (|S| x |A|) 248 """ 249 S = self._n_states 250 A = self._n_actions 251 if self.bound_type_rew != "bernstein": 252 ci = _chernoff( 253 it=self.iteration, 254 N=nb_observations, 255 range=self.reward_range[1], 256 delta=self.delta, 257 sqrt_C=3.5, 258 log_C=2 * S * A, 259 ) 260 return self.alpha_r * ci 261 else: 262 N = np.maximum(1, nb_observations) 263 Nm1 = np.maximum(1, nb_observations - 1) 264 var_r = self.variance_proxy_reward / Nm1 265 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 266 beta = bernstein( 267 scale_a=14 * var_r / N, 268 log_scale_a=log_value, 269 scale_b=49.0 * self.r_max / (3.0 * Nm1), 270 log_scale_b=log_value, 271 alpha_1=math.sqrt(self.alpha_r), 272 alpha_2=self.alpha_r, 273 ) 274 return beta
calculates the confidence bounds on the reward.
Returns
- np.array: The vector of confidence bounds on the reward function (|S| x |A|)
276 def beta_p(self, nb_observations) -> np.ndarray: 277 """ 278 calculates the confidence bounds on the transition probabilities. 279 Returns 280 ------- 281 np.array 282 The vector of confidence bounds on the reward function (|S| x |A|) 283 """ 284 S = self._n_states 285 A = self._n_actions 286 if self.bound_type_p != "bernstein": 287 beta = _chernoff( 288 it=self.iteration, 289 N=nb_observations, 290 range=1.0, 291 delta=self.delta, 292 sqrt_C=14 * S, 293 log_C=2 * A, 294 ) 295 return self.alpha_p * beta.reshape([S, A, 1]) 296 else: 297 N = np.maximum(1, nb_observations) 298 Nm1 = np.maximum(1, nb_observations - 1) 299 var_p = self.P * (1.0 - self.P) 300 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 301 beta = bernstein( 302 scale_a=14 * var_p / N[:, :, np.newaxis], 303 log_scale_a=log_value, 304 scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]), 305 log_scale_b=log_value, 306 alpha_1=math.sqrt(self.alpha_p), 307 alpha_2=self.alpha_p, 308 ) 309 return beta
calculates the confidence bounds on the transition probabilities.
Returns
- np.array: The vector of confidence bounds on the reward function (|S| x |A|)
311 def solve_optimistic_model(self) -> Union[None, float]: 312 """ 313 solves the optimistic value iteration. 314 Returns 315 ------- 316 Union[None, float] 317 The span value of the estimates from the optimistic value iteration or None if no solution has been found. 318 """ 319 nb_observations = self.N.sum(-1) 320 321 beta_r = self.beta_r(nb_observations) # confidence bounds on rewards 322 beta_p = self.beta_p( 323 nb_observations 324 ) # confidence bounds on transition probabilities 325 326 T = self.P 327 estimated_rewards = self.estimated_rewards 328 329 assert np.isclose(T.sum(-1), 1.0).all() 330 try: 331 res = extended_value_iteration( 332 T, estimated_rewards, beta_r, beta_p, self.reward_range[1] 333 ) 334 except SystemError: 335 # Debug logs if the optimistic value iteration fails 336 os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True) 337 for i in range(100): 338 if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"): 339 np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T) 340 np.save( 341 f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy", 342 estimated_rewards, 343 ) 344 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r) 345 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p) 346 break 347 res = None 348 349 if res is not None: 350 span_value_new, self.Q, self.V = res 351 span_value = span_value_new 352 self._actor.set_q_values(self.Q) 353 354 assert span_value >= 0, "The span value cannot be lower than zero" 355 assert np.abs(span_value - span_value_new) < 1e-8 356 357 return span_value 358 return None
solves the optimistic value iteration.
Returns
- Union[None, float]: The span value of the estimates from the optimistic value iteration or None if no solution has been found.