colosseum.hardness.analysis
1import os 2from copy import deepcopy 3from dataclasses import dataclass 4from multiprocessing import Pool 5from typing import ( 6 TYPE_CHECKING, 7 Any, 8 Dict, 9 Iterable, 10 Type, 11 Tuple, 12 Callable, 13 Union, 14 Collection, 15) 16 17import pandas as pd 18import seaborn as sns 19from matplotlib import pyplot as plt 20 21from colosseum import config 22from colosseum.agent.agents.episodic import PSRLEpisodic 23from colosseum.agent.agents.infinite_horizon import UCRL2Continuous 24from colosseum.mdp.base import BaseMDP 25 26sns.set_theme() 27 28if TYPE_CHECKING: 29 from colosseum.agent.agents.base import BaseAgent 30 31 32@dataclass() 33class SingleInstanceHyperOptConfig: 34 mdp_class: Type["BaseMDP"] 35 """The class of the MDP with respect to whom we are performing the parameters optimization.""" 36 mdp_parameters: Dict[str, Any] 37 """The dictionary containing the parameters for the MDP class with respect to whom we are performing the parameters optimization.""" 38 n_seed: int 39 """The number of seed the agent/MDP interaction is repeated.""" 40 optimization_horizon: int 41 """The optimization horizon of the agent/MDP interaction.""" 42 num_samples: int 43 """The number of samples of the agent parameters.""" 44 max_interaction_s: float 45 """The maximum amount of seconds allocated to training the agent.""" 46 log_every: int 47 """The length of the interval between logging of the performance indicators.""" 48 episodic_near_optimal_agent_class: Type["BaseAgent"] = PSRLEpisodic 49 """The class of the near optimal agent for the episodic setting. By default, it is ``PSRLEpisodic``.""" 50 continuous_near_optimal_agent_class: Type["BaseAgent"] = UCRL2Continuous 51 """The class of the near optimal agent for the continuous setting. By default, it is ``UCRL2Continuous``.""" 52 53 54@dataclass() 55class HardnessAnalysisParams: 56 mdp_class: Type["BaseMDP"] 57 """The class of the MDP whose hardness we are studying.""" 58 varying_params_name: str 59 """The name of the parameter being varied.""" 60 varying_params_values: Iterable 61 """The values of the parameter being varied.""" 62 fixed_params: Dict[str, Any] 63 """The dictionary containing the names and values for the parameters being kept fixed.""" 64 n_seeds_mdp: int 65 """The number of seeds used when instantiating the MDP.""" 66 hardness_measures: Collection[Union[str, Callable[[Type["BaseMDP"]], float]]] = ( 67 "diameter", 68 "value_norm", 69 ) 70 """An iterable containing either the code name of an available measure of hardness or a function that takes an MDP 71 object as input and returns a value.""" 72 near_optimal_agent_hyperopt_config: SingleInstanceHyperOptConfig = None 73 """The parameters optimization configuration for the near optimal agent. By default, it is None, which means 74 that the regret of the near optimal agent with tuned parameters is not computed and used as proxy for a 75 complete measure of hardness.""" 76 varying_params_name_clean: str = None 77 """The name of the parameter being varied in a clean format.""" 78 retrieve_from_cache: bool = True 79 """If ture, the ``config.get_hardness_measures_cache_folder()`` is searched for a cached value of the measure. 80 By default, it is True.""" 81 82 @property 83 def clean_varying_prm_name(self) -> str: 84 """ 85 Returns 86 ------- 87 str 88 A nicely formatted name for the varying parameter. 89 """ 90 if self.varying_params_name_clean is None: 91 return self.varying_params_name 92 return self.varying_params_name_clean 93 94 95def run_scenario_analysis( 96 hap: HardnessAnalysisParams, 97 ax=None, 98): 99 """ 100 runs a hardness analysis scenario. 101 102 Parameters 103 ---------- 104 hap : HardnessAnalysisParams 105 The hardness analysis scenario to run. 106 ax : plt.Axes 107 The ax object where the plot will be put. By default, a new axis is created. 108 """ 109 110 show = ax is None 111 if ax is None: 112 fig, ax = plt.subplots() 113 114 dfs = get_varying_parameter_dfs(hap, normalize_measures=True) 115 116 for measure_name, df in dfs.items(): 117 sns.lineplot( 118 x=hap.clean_varying_prm_name, 119 y=measure_name, 120 data=df, 121 ax=ax, 122 label=measure_name, 123 ) 124 plt.ylabel("Hardness measure value") 125 plt.legend() 126 127 if show: 128 plt.show() 129 130 131def get_varying_parameter_dfs( 132 hap: HardnessAnalysisParams, 133 normalize_measures: bool = False, 134) -> Dict[str, pd.DataFrame]: 135 """ 136 computes the hardness measures for the scenarios and stored them in a `pd.DataFrame`s. 137 138 Parameters 139 ---------- 140 hap : HardnessAnalysisParams 141 The hardness analysis scenario to run. 142 normalize_measures : bool 143 If True, the values of the hardness measures are normalized. 144 145 Returns 146 ------- 147 Dict[str, pd.DataFrame] 148 A dictionary that associated the name of a hardness measure to its corresponding pd.DataFrame. 149 """ 150 151 if hap.near_optimal_agent_hyperopt_config is not None: 152 raise NotImplementedError( 153 "The computation of the regret of the near optimal agent as a proxy for a complete measure of hardness is " 154 "being refactored at the moment." 155 ) 156 157 measure_results = dict() 158 if config.get_available_cores() > 1: 159 inputs = [ 160 ( 161 hap.mdp_class, 162 hap.fixed_params, 163 hap.varying_params_name, 164 varying_value, 165 seed, 166 measure, 167 hap.retrieve_from_cache, 168 ) 169 for varying_value in hap.varying_params_values 170 for measure in hap.hardness_measures 171 for seed in range(hap.n_seeds_mdp) 172 ] 173 174 with Pool(processes=config.get_available_cores()) as p: 175 for measure_name, varying_value, seed, measure_value in p.starmap_async( 176 _compute_hardness_measure, inputs 177 ).get(): 178 _add_result( 179 measure_results, measure_name, varying_value, seed, measure_value 180 ) 181 else: 182 for seed in range(hap.n_seeds_mdp): 183 for measure in hap.hardness_measures: 184 for varying_value in hap.varying_params_values: 185 out = compute_hardness_measure_for_varying_prm( 186 hap.mdp_class, 187 hap.fixed_params, 188 hap.varying_params_name, 189 varying_value, 190 seed, 191 measure, 192 force_single_core=True, 193 retrieve_from_cache=hap.retrieve_from_cache, 194 return_n_states=hap.varying_params_name == "size", 195 ) 196 if hap.varying_params_name == "size": 197 measure_name, measure_value, n_states = out 198 else: 199 measure_name, measure_value = out 200 201 _add_result( 202 measure_results, 203 measure_name, 204 varying_value 205 if hap.varying_params_name != "size" 206 else n_states, 207 seed, 208 measure_value, 209 ) 210 211 for measure_name in measure_results: 212 # Create a Pandas DataFrame 213 df = pd.DataFrame.from_dict(measure_results[measure_name]) 214 215 # Normalize the values if required 216 if normalize_measures: 217 min_value = df.loc[:, measure_name].min() 218 max_value = df.loc[:, measure_name].max() 219 if max_value > min_value + 1e-4: 220 df.loc[:, measure_name] = (df.loc[:, measure_name] - min_value) / ( 221 max_value - min_value 222 ) 223 else: 224 df.loc[:, measure_name] = 0.5 225 226 # Clean the varying parameter name 227 df = df.rename(columns={"Varying value": hap.clean_varying_prm_name}).set_index( 228 hap.clean_varying_prm_name 229 ) 230 231 measure_results[measure_name] = df 232 233 return measure_results 234 235 236def _compute_hardness_measure( 237 mdp_class, 238 fixed_params, 239 varying_params_name, 240 varying_value, 241 seed, 242 measure, 243 retrieve_from_cache, 244 return_n_states: bool = False, 245): 246 measure_name, measure_value = compute_hardness_measure_for_varying_prm( 247 mdp_class, 248 fixed_params, 249 varying_params_name, 250 varying_value, 251 seed, 252 measure, 253 force_single_core=True, 254 retrieve_from_cache=retrieve_from_cache, 255 ) 256 return measure_name, varying_value, seed, measure_value 257 258 259def compute_hardness_measure_for_varying_prm( 260 mdp_class: Type["BaseMDP"], 261 fixed_params: Dict[str, Any], 262 varying_params_name: str, 263 varying_value: Any, 264 seed: int, 265 measure: Union[str, Callable[[BaseMDP], float]], 266 force_single_core: bool = False, 267 retrieve_from_cache: bool = True, 268 folder: str = None, 269 return_n_states: bool = False, 270) -> Tuple[str, float, int]: 271 """ 272 computes the hardness measure for varying values of the parameter. 273 274 Parameters 275 ---------- 276 mdp_class : Type["BaseMDP"] 277 The MDP class for which the measures will be computed. 278 fixed_params : Dict[str, Any] 279 The parameters of the MDP that are being kept fixed. 280 varying_params_name : str 281 The name of the varying parameter. 282 varying_value : Any 283 The value of the parameter that is varying. 284 seed : int 285 The random seed. 286 measure : Union[str, Callable[[BaseMDP], float]] 287 The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given 288 as a string, it will be looked for in the ones available in the package 289 force_single_core : bool 290 If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when 291 the measure is given as a function. By default, single processing is not enforced. 292 retrieve_from_cache : bool 293 If True, the function will look for cached values of the measure. Note that this also hold if the measure is 294 given as a function. 295 folder : str 296 The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`. 297 return_n_states : bool 298 If True, the number of states is returned. 299 300 Returns 301 ------- 302 str 303 The nicely formatted name of the measure. 304 float 305 The value of the measure. 306 int, optional 307 The number of states. 308 """ 309 310 # Instantiate the MDP parameters 311 mdp_kwargs = deepcopy(fixed_params) 312 mdp_kwargs["seed"] = seed 313 mdp_kwargs[varying_params_name] = varying_value 314 315 return compute_hardness_measure( 316 mdp_class, 317 mdp_kwargs, 318 measure, 319 force_single_core, 320 retrieve_from_cache, 321 folder, 322 True, 323 return_n_states, 324 ) 325 326 327def compute_hardness_measure( 328 mdp_class: Type["BaseMDP"], 329 mdp_params: Dict[str, Any], 330 measure: Union[str, Callable[[BaseMDP], float]], 331 force_single_core: bool = False, 332 retrieve_from_cache: bool = True, 333 folder: str = None, 334 return_measure_name: bool = False, 335 return_n_states: bool = False, 336) -> Union[float, Tuple[str, float], Tuple[float, int], Tuple[str, float, int]]: 337 """ 338 339 Parameters 340 ---------- 341 mdp_class : Type["BaseMDP"] 342 The MDP class for which the measures will be computed. 343 mdp_params : Dict[str, Any] 344 The parameters for the MDP. 345 measure : Union[str, Callable[[BaseMDP], float]] 346 The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given 347 as a string, it will be looked for in the ones available in the package 348 force_single_core : bool 349 If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when 350 the measure is given as a function. By default, single processing is not enforced. 351 retrieve_from_cache : bool 352 If True, the function will look for cached values of the measure. Note that this also hold if the measure is 353 given as a function. 354 folder : str 355 The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`. 356 return_measure_name : bool 357 If True, a nicely formatted name for the measure is returned. 358 return_n_states : bool 359 If True, the number of states is returned. 360 361 Returns 362 ------- 363 str, optional 364 The nicely formatted name of the measure. 365 float 366 The value of the measure. 367 int, optional 368 The number of states. 369 """ 370 371 # Obtain the name of the measure and the function to compute it 372 measure_name, measure_f = _process_measure(measure) 373 374 # Check if the measure has already been computed 375 if retrieve_from_cache: 376 mdp_shell = mdp_class( 377 **mdp_params, instantiate_mdp=False, exclude_horizon_from_parameters=True 378 ) 379 if folder is None: 380 folder = ( 381 config.get_hardness_measures_cache_folder() 382 + mdp_class.__name__ 383 + os.sep 384 ) 385 386 measure_file_path = f"{folder}{measure_name}_{mdp_shell.hash}.txt" 387 if os.path.isfile(measure_file_path): 388 with open(measure_file_path, "r") as f: 389 measure_value = float(f.read()) 390 391 out = [measure_value] 392 if return_measure_name: 393 out.insert(0, measure_name) 394 if return_n_states: 395 mdp_shell.instantiate_MDP() 396 out.append(mdp_shell.n_states) 397 return out 398 399 # Possible forcing the computation to avoid using multiple cores 400 if force_single_core and config.get_available_cores() > 1: 401 available_cores = config.get_available_cores() 402 config.disable_multiprocessing() 403 mdp = mdp_class(**mdp_params) 404 measure_value = measure_f(mdp) 405 config.set_available_cores(available_cores) 406 else: 407 mdp = mdp_class(**mdp_params) 408 measure_value = measure_f(mdp) 409 410 # Caching the value of the measure (only in case we were supposed to look for it in the first place) 411 if retrieve_from_cache: 412 os.makedirs(os.path.dirname(measure_file_path), exist_ok=True) 413 with open(measure_file_path, "w") as f: 414 f.write(str(measure_value)) 415 416 out = [measure_value] 417 if return_measure_name: 418 out.insert(0, measure_name) 419 if return_n_states: 420 out.append(mdp.n_states) 421 return out 422 423 424def _process_measure( 425 measure: Union[str, Callable[[BaseMDP], float]] 426) -> Tuple[str, Callable[[BaseMDP], float]]: 427 if type(measure) == str: 428 measure_name = measure 429 if measure_name not in BaseMDP.get_available_hardness_measures(): 430 raise ValueError( 431 f"{measure} is not a valid hardness measure, the available ones are: " 432 f"{BaseMDP.get_available_hardness_measures()}" 433 ) 434 return measure, lambda mdp: mdp.get_measure_from_name(measure) 435 elif callable(measure): 436 return measure.__name__, measure 437 else: 438 raise ValueError( 439 f"The measure should either be a string or a Callable, {type(measure)} received." 440 ) 441 442 443def _add_result(measure_results, measure_name, varying_value, seed, measure_value): 444 measure_name = measure_name.capitalize().replace("_", " ") 445 446 if measure_name not in measure_results: 447 measure_results[measure_name] = { 448 "Varying value": [], 449 measure_name: [], 450 "Seed": [], 451 } 452 measure_results[measure_name]["Varying value"].append(varying_value) 453 measure_results[measure_name]["Seed"].append(seed) 454 measure_results[measure_name][measure_name].append(measure_value)
33@dataclass() 34class SingleInstanceHyperOptConfig: 35 mdp_class: Type["BaseMDP"] 36 """The class of the MDP with respect to whom we are performing the parameters optimization.""" 37 mdp_parameters: Dict[str, Any] 38 """The dictionary containing the parameters for the MDP class with respect to whom we are performing the parameters optimization.""" 39 n_seed: int 40 """The number of seed the agent/MDP interaction is repeated.""" 41 optimization_horizon: int 42 """The optimization horizon of the agent/MDP interaction.""" 43 num_samples: int 44 """The number of samples of the agent parameters.""" 45 max_interaction_s: float 46 """The maximum amount of seconds allocated to training the agent.""" 47 log_every: int 48 """The length of the interval between logging of the performance indicators.""" 49 episodic_near_optimal_agent_class: Type["BaseAgent"] = PSRLEpisodic 50 """The class of the near optimal agent for the episodic setting. By default, it is ``PSRLEpisodic``.""" 51 continuous_near_optimal_agent_class: Type["BaseAgent"] = UCRL2Continuous 52 """The class of the near optimal agent for the continuous setting. By default, it is ``UCRL2Continuous``."""
The class of the MDP with respect to whom we are performing the parameters optimization.
20@gin.configurable 21class PSRLEpisodic(BaseAgent): 22 """ 23 The posterior sampling for reinforcement learning algorithm. 24 25 Osband, Ian, Daniel Russo, and Benjamin Van Roy. "(More) efficient reinforcement learning via posterior sampling." 26 Advances in Neural Information Processing Systems 26 (2013). 27 """ 28 29 def step_update( 30 self, 31 ts_t: dm_env.TimeStep, 32 a_t: "ACTION_TYPE", 33 ts_tp1: dm_env.TimeStep, 34 time: int, 35 ): 36 super(PSRLEpisodic, self).step_update(ts_t, a_t, ts_tp1, time) 37 38 @staticmethod 39 def is_emission_map_accepted(emission_map: "EmissionMap") -> bool: 40 return emission_map.is_tabular 41 42 @staticmethod 43 def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0): 44 return ( 45 "from colosseum.agent.mdp_models import bayesian_models\n" 46 f"prms_{index}/PSRLEpisodic.reward_prior_model = %bayesian_models.RewardsConjugateModel.N_NIG\n" 47 f"prms_{index}/PSRLEpisodic.transitions_prior_model = %bayesian_models.TransitionsConjugateModel.M_DIR\n" 48 f"prms_{index}/PSRLEpisodic.rewards_prior_prms = [{parameters['prior_mean']}, 1, 1, 1]\n" 49 f"prms_{index}/PSRLEpisodic.transitions_prior_prms = [{parameters['transition_prior']}]" 50 ) 51 52 @staticmethod 53 def is_episodic() -> bool: 54 return True 55 56 @staticmethod 57 def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]: 58 return dict( 59 prior_mean=tune.uniform(0.001, 2.0), transition_prior=tune.uniform(0.001, 2) 60 ) 61 62 @staticmethod 63 def get_agent_instance_from_parameters( 64 seed: int, 65 optimization_horizon: int, 66 mdp_specs: MDPSpec, 67 parameters: Dict[str, Any], 68 ) -> "BaseAgent": 69 return PSRLEpisodic( 70 mdp_specs=mdp_specs, 71 seed=seed, 72 optimization_horizon=optimization_horizon, 73 reward_prior_model=RewardsConjugateModel.N_NIG, 74 transitions_prior_model=TransitionsConjugateModel.M_DIR, 75 rewards_prior_prms=[parameters["prior_mean"], 1, 1, 1], 76 transitions_prior_prms=[parameters["transition_prior"]], 77 ) 78 79 @property 80 def current_optimal_stochastic_policy(self) -> np.ndarray: 81 T_map, R_map = self._mdp_model.get_map_estimate() 82 Q, _ = episodic_value_iteration(self._time_horizon, T_map, R_map) 83 return get_policy_from_q_values(Q, True) 84 85 def __init__( 86 self, 87 seed: int, 88 mdp_specs: MDPSpec, 89 optimization_horizon: int, 90 # MDP model parameters 91 reward_prior_model: RewardsConjugateModel = None, 92 transitions_prior_model: TransitionsConjugateModel = None, 93 rewards_prior_prms=None, 94 transitions_prior_prms=None, 95 # Actor parameters 96 epsilon_greedy: Union[float, Callable] = None, 97 boltzmann_temperature: Union[float, Callable] = None, 98 ): 99 """ 100 101 Parameters 102 ---------- 103 seed : int 104 The random seed. 105 mdp_specs : MDPSpec 106 The full specification of the MDP. 107 optimization_horizon : int 108 The total number of interactions that the agent is expected to have with the MDP. 109 reward_prior_model : RewardsConjugateModel, optional 110 The reward priors. 111 transitions_prior_model : TransitionsConjugateModel, optional 112 The transitions priors. 113 rewards_prior_prms : Any 114 The reward prior parameters. 115 transitions_prior_prms : Any 116 The transitions prior parameters. 117 epsilon_greedy : Union[float, Callable], optional 118 The probability of selecting an action at random. It can be provided as a float or as a function of the 119 total number of interactions. By default, the probability is set to zero. 120 boltzmann_temperature : Union[float, Callable], optional 121 The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of 122 the total number of interactions. By default, Boltzmann exploration is disabled. 123 """ 124 125 mdp_model = BayesianMDPModel( 126 seed, 127 mdp_specs, 128 reward_prior_model=reward_prior_model, 129 transitions_prior_model=transitions_prior_model, 130 rewards_prior_prms=rewards_prior_prms, 131 transitions_prior_prms=transitions_prior_prms, 132 ) 133 actor = QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature) 134 135 super(PSRLEpisodic, self).__init__( 136 seed, 137 mdp_specs, 138 mdp_model, 139 actor, 140 optimization_horizon, 141 ) 142 143 def episode_end_update(self): 144 Q, _ = episodic_value_iteration(self._time_horizon, *self._mdp_model.sample()) 145 self._actor.set_q_values(Q) 146 147 def before_start_interacting(self): 148 self.episode_end_update()
The class of the near optimal agent for the episodic setting. By default, it is PSRLEpisodic
.
Inherited Members
- colosseum.agent.agents.episodic.posterior_sampling.PSRLEpisodic
- PSRLEpisodic
- step_update
- is_emission_map_accepted
- produce_gin_file_from_parameters
- is_episodic
- get_hyperparameters_search_spaces
- get_agent_instance_from_parameters
- current_optimal_stochastic_policy
- episode_end_update
- before_start_interacting
34@gin.configurable 35class UCRL2Continuous(BaseAgent): 36 """ 37 The second version of upper confidence for reinforcement learning algorithm. 38 39 Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in 40 neural information processing systems 21 (2008). 41 42 Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality." 43 arXiv preprint arXiv:2007.05456 (2020). 44 """ 45 46 @staticmethod 47 def is_emission_map_accepted(emission_map: "EmissionMap") -> bool: 48 return emission_map.is_tabular 49 50 @staticmethod 51 def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0): 52 string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n" 53 for k, v in parameters.items(): 54 string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n" 55 return string[:-1] 56 57 @staticmethod 58 def is_episodic() -> bool: 59 return False 60 61 @staticmethod 62 def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]: 63 return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)} 64 65 @staticmethod 66 def get_agent_instance_from_parameters( 67 seed: int, 68 optimization_horizon: int, 69 mdp_specs: "MDPSpec", 70 parameters: Dict[str, Any], 71 ) -> "BaseAgent": 72 return UCRL2Continuous( 73 mdp_specs=mdp_specs, 74 seed=seed, 75 optimization_horizon=optimization_horizon, 76 alpha_p=parameters["alpha_p"], 77 alpha_r=parameters["alpha_r"], 78 bound_type_p="bernstein", 79 ) 80 81 @property 82 def current_optimal_stochastic_policy(self) -> np.ndarray: 83 Q, _ = discounted_value_iteration(self.P, self.estimated_rewards) 84 return get_policy_from_q_values(Q, True) 85 86 def __init__( 87 self, 88 seed: int, 89 mdp_specs: "MDPSpec", 90 optimization_horizon: int, 91 # MDP model parameters 92 alpha_r=1.0, 93 alpha_p=1.0, 94 bound_type_p="_chernoff", 95 bound_type_rew="_chernoff", 96 # Actor parameters 97 epsilon_greedy: Union[float, Callable] = None, 98 boltzmann_temperature: Union[float, Callable] = None, 99 ): 100 r""" 101 Parameters 102 ---------- 103 seed : int 104 The random seed. 105 mdp_specs : MDPSpec 106 The full specification of the MDP. 107 optimization_horizon : int 108 The total number of interactions that the agent is expected to have with the MDP. 109 alpha_r : float 110 The :math:`\alpha` parameter for the rewards. By default, it is set to one. 111 alpha_p : float 112 The :math:`\alpha` parameter for the transitions. By default, it is set to one. 113 bound_type_p : str 114 The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default, 115 it is set to '_chernoff'. 116 bound_type_rew : str 117 The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default, 118 it is set to '_chernoff'. 119 epsilon_greedy : Union[float, Callable], optional 120 The probability of selecting an action at random. It can be provided as a float or as a function of the 121 total number of interactions. By default, the probability is set to zero. 122 boltzmann_temperature : Union[float, Callable], optional 123 The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of 124 the total number of interactions. By default, Boltzmann exploration is disabled. 125 """ 126 127 n_states = self._n_states = mdp_specs.observations.num_values 128 n_actions = self._n_actions = mdp_specs.actions.num_values 129 self.reward_range = mdp_specs.rewards_range 130 131 assert bound_type_p in ["_chernoff", "bernstein"] 132 assert bound_type_rew in ["_chernoff", "bernstein"] 133 134 self.alpha_p = alpha_p 135 self.alpha_r = alpha_r 136 137 # initialize matrices 138 self.policy = np.zeros((n_states,), dtype=np.int_) 139 self.policy_indices = np.zeros((n_states,), dtype=np.int_) 140 141 # initialization 142 self.iteration = 0 143 self.episode = 0 144 self.delta = 1.0 # confidence 145 self.bound_type_p = bound_type_p 146 self.bound_type_rew = bound_type_rew 147 148 self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states 149 150 self.estimated_rewards = ( 151 np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1] 152 ) 153 self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32) 154 self.estimated_holding_times = np.ones((n_states, n_actions), np.float32) 155 self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32) 156 157 self.current_state = None 158 self.artificial_episode = 0 159 self.episode_reward_data = dict() 160 self.episode_transition_data = dict() 161 162 super(UCRL2Continuous, self).__init__( 163 seed, 164 mdp_specs, 165 None, 166 QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature), 167 optimization_horizon, 168 ) 169 170 def is_episode_end( 171 self, 172 ts_t: dm_env.TimeStep, 173 a_t: "ACTION_TYPE", 174 ts_tp1: dm_env.TimeStep, 175 time: int, 176 ) -> bool: 177 nu_k = len(self.episode_transition_data[ts_t.observation, a_t]) 178 return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k) 179 180 def episode_end_update(self): 181 self.episode += 1 182 self.delta = 1 / math.sqrt(self.iteration + 1) 183 184 new_sp = self.solve_optimistic_model() 185 if new_sp is not None: 186 self.span_value = new_sp / self.reward_range[1] 187 188 if len(self.episode_transition_data) > 0: 189 self.model_update() 190 self.episode_reward_data = dict() 191 self.episode_transition_data = dict() 192 193 def before_start_interacting(self): 194 self.episode_end_update() 195 196 def step_update( 197 self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int 198 ): 199 self.N[ts_t.observation, a_t, ts_tp1.observation] += 1 200 201 if (ts_t.observation, a_t) in self.episode_reward_data: 202 self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward) 203 if not ts_tp1.last(): 204 self.episode_transition_data[ts_t.observation, a_t].append( 205 ts_tp1.observation 206 ) 207 else: 208 self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward] 209 if not ts_tp1.last(): 210 self.episode_transition_data[ts_t.observation, a_t] = [ 211 ts_tp1.observation 212 ] 213 214 def model_update(self): 215 """ 216 updates the model given the transitions obtained during the artificial episode. 217 """ 218 for (s_tm1, action), r_ts in self.episode_reward_data.items(): 219 # updated observations 220 scale_f = self.N[s_tm1, action].sum() 221 for r in r_ts: 222 # update the number of total iterations 223 self.iteration += 1 224 225 # update reward and variance estimate 226 scale_f += 1 227 old_estimated_reward = self.estimated_rewards[s_tm1, action] 228 self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0) 229 self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0) 230 self.variance_proxy_reward[s_tm1, action] += ( 231 r - old_estimated_reward 232 ) * (r - self.estimated_rewards[s_tm1, action]) 233 234 # update holding time 235 self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0) 236 self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1) 237 238 for (s_tm1, action) in set(self.episode_transition_data.keys()): 239 self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum() 240 241 def beta_r(self, nb_observations) -> np.ndarray: 242 """ 243 calculates the confidence bounds on the reward. 244 Returns 245 ------- 246 np.array 247 The vector of confidence bounds on the reward function (|S| x |A|) 248 """ 249 S = self._n_states 250 A = self._n_actions 251 if self.bound_type_rew != "bernstein": 252 ci = _chernoff( 253 it=self.iteration, 254 N=nb_observations, 255 range=self.reward_range[1], 256 delta=self.delta, 257 sqrt_C=3.5, 258 log_C=2 * S * A, 259 ) 260 return self.alpha_r * ci 261 else: 262 N = np.maximum(1, nb_observations) 263 Nm1 = np.maximum(1, nb_observations - 1) 264 var_r = self.variance_proxy_reward / Nm1 265 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 266 beta = bernstein( 267 scale_a=14 * var_r / N, 268 log_scale_a=log_value, 269 scale_b=49.0 * self.r_max / (3.0 * Nm1), 270 log_scale_b=log_value, 271 alpha_1=math.sqrt(self.alpha_r), 272 alpha_2=self.alpha_r, 273 ) 274 return beta 275 276 def beta_p(self, nb_observations) -> np.ndarray: 277 """ 278 calculates the confidence bounds on the transition probabilities. 279 Returns 280 ------- 281 np.array 282 The vector of confidence bounds on the reward function (|S| x |A|) 283 """ 284 S = self._n_states 285 A = self._n_actions 286 if self.bound_type_p != "bernstein": 287 beta = _chernoff( 288 it=self.iteration, 289 N=nb_observations, 290 range=1.0, 291 delta=self.delta, 292 sqrt_C=14 * S, 293 log_C=2 * A, 294 ) 295 return self.alpha_p * beta.reshape([S, A, 1]) 296 else: 297 N = np.maximum(1, nb_observations) 298 Nm1 = np.maximum(1, nb_observations - 1) 299 var_p = self.P * (1.0 - self.P) 300 log_value = 2.0 * S * A * (self.iteration + 1) / self.delta 301 beta = bernstein( 302 scale_a=14 * var_p / N[:, :, np.newaxis], 303 log_scale_a=log_value, 304 scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]), 305 log_scale_b=log_value, 306 alpha_1=math.sqrt(self.alpha_p), 307 alpha_2=self.alpha_p, 308 ) 309 return beta 310 311 def solve_optimistic_model(self) -> Union[None, float]: 312 """ 313 solves the optimistic value iteration. 314 Returns 315 ------- 316 Union[None, float] 317 The span value of the estimates from the optimistic value iteration or None if no solution has been found. 318 """ 319 nb_observations = self.N.sum(-1) 320 321 beta_r = self.beta_r(nb_observations) # confidence bounds on rewards 322 beta_p = self.beta_p( 323 nb_observations 324 ) # confidence bounds on transition probabilities 325 326 T = self.P 327 estimated_rewards = self.estimated_rewards 328 329 assert np.isclose(T.sum(-1), 1.0).all() 330 try: 331 res = extended_value_iteration( 332 T, estimated_rewards, beta_r, beta_p, self.reward_range[1] 333 ) 334 except SystemError: 335 # Debug logs if the optimistic value iteration fails 336 os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True) 337 for i in range(100): 338 if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"): 339 np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T) 340 np.save( 341 f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy", 342 estimated_rewards, 343 ) 344 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r) 345 np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p) 346 break 347 res = None 348 349 if res is not None: 350 span_value_new, self.Q, self.V = res 351 span_value = span_value_new 352 self._actor.set_q_values(self.Q) 353 354 assert span_value >= 0, "The span value cannot be lower than zero" 355 assert np.abs(span_value - span_value_new) < 1e-8 356 357 return span_value 358 return None
The class of the near optimal agent for the continuous setting. By default, it is UCRL2Continuous
.
Inherited Members
- colosseum.agent.agents.infinite_horizon.ucrl2.UCRL2Continuous
- UCRL2Continuous
- is_emission_map_accepted
- produce_gin_file_from_parameters
- is_episodic
- get_hyperparameters_search_spaces
- get_agent_instance_from_parameters
- current_optimal_stochastic_policy
- is_episode_end
- episode_end_update
- before_start_interacting
- step_update
- model_update
- beta_r
- beta_p
- solve_optimistic_model
55@dataclass() 56class HardnessAnalysisParams: 57 mdp_class: Type["BaseMDP"] 58 """The class of the MDP whose hardness we are studying.""" 59 varying_params_name: str 60 """The name of the parameter being varied.""" 61 varying_params_values: Iterable 62 """The values of the parameter being varied.""" 63 fixed_params: Dict[str, Any] 64 """The dictionary containing the names and values for the parameters being kept fixed.""" 65 n_seeds_mdp: int 66 """The number of seeds used when instantiating the MDP.""" 67 hardness_measures: Collection[Union[str, Callable[[Type["BaseMDP"]], float]]] = ( 68 "diameter", 69 "value_norm", 70 ) 71 """An iterable containing either the code name of an available measure of hardness or a function that takes an MDP 72 object as input and returns a value.""" 73 near_optimal_agent_hyperopt_config: SingleInstanceHyperOptConfig = None 74 """The parameters optimization configuration for the near optimal agent. By default, it is None, which means 75 that the regret of the near optimal agent with tuned parameters is not computed and used as proxy for a 76 complete measure of hardness.""" 77 varying_params_name_clean: str = None 78 """The name of the parameter being varied in a clean format.""" 79 retrieve_from_cache: bool = True 80 """If ture, the ``config.get_hardness_measures_cache_folder()`` is searched for a cached value of the measure. 81 By default, it is True.""" 82 83 @property 84 def clean_varying_prm_name(self) -> str: 85 """ 86 Returns 87 ------- 88 str 89 A nicely formatted name for the varying parameter. 90 """ 91 if self.varying_params_name_clean is None: 92 return self.varying_params_name 93 return self.varying_params_name_clean
The dictionary containing the names and values for the parameters being kept fixed.
An iterable containing either the code name of an available measure of hardness or a function that takes an MDP object as input and returns a value.
The parameters optimization configuration for the near optimal agent. By default, it is None, which means that the regret of the near optimal agent with tuned parameters is not computed and used as proxy for a complete measure of hardness.
96def run_scenario_analysis( 97 hap: HardnessAnalysisParams, 98 ax=None, 99): 100 """ 101 runs a hardness analysis scenario. 102 103 Parameters 104 ---------- 105 hap : HardnessAnalysisParams 106 The hardness analysis scenario to run. 107 ax : plt.Axes 108 The ax object where the plot will be put. By default, a new axis is created. 109 """ 110 111 show = ax is None 112 if ax is None: 113 fig, ax = plt.subplots() 114 115 dfs = get_varying_parameter_dfs(hap, normalize_measures=True) 116 117 for measure_name, df in dfs.items(): 118 sns.lineplot( 119 x=hap.clean_varying_prm_name, 120 y=measure_name, 121 data=df, 122 ax=ax, 123 label=measure_name, 124 ) 125 plt.ylabel("Hardness measure value") 126 plt.legend() 127 128 if show: 129 plt.show()
runs a hardness analysis scenario.
Parameters
- hap (HardnessAnalysisParams): The hardness analysis scenario to run.
- ax (plt.Axes): The ax object where the plot will be put. By default, a new axis is created.
132def get_varying_parameter_dfs( 133 hap: HardnessAnalysisParams, 134 normalize_measures: bool = False, 135) -> Dict[str, pd.DataFrame]: 136 """ 137 computes the hardness measures for the scenarios and stored them in a `pd.DataFrame`s. 138 139 Parameters 140 ---------- 141 hap : HardnessAnalysisParams 142 The hardness analysis scenario to run. 143 normalize_measures : bool 144 If True, the values of the hardness measures are normalized. 145 146 Returns 147 ------- 148 Dict[str, pd.DataFrame] 149 A dictionary that associated the name of a hardness measure to its corresponding pd.DataFrame. 150 """ 151 152 if hap.near_optimal_agent_hyperopt_config is not None: 153 raise NotImplementedError( 154 "The computation of the regret of the near optimal agent as a proxy for a complete measure of hardness is " 155 "being refactored at the moment." 156 ) 157 158 measure_results = dict() 159 if config.get_available_cores() > 1: 160 inputs = [ 161 ( 162 hap.mdp_class, 163 hap.fixed_params, 164 hap.varying_params_name, 165 varying_value, 166 seed, 167 measure, 168 hap.retrieve_from_cache, 169 ) 170 for varying_value in hap.varying_params_values 171 for measure in hap.hardness_measures 172 for seed in range(hap.n_seeds_mdp) 173 ] 174 175 with Pool(processes=config.get_available_cores()) as p: 176 for measure_name, varying_value, seed, measure_value in p.starmap_async( 177 _compute_hardness_measure, inputs 178 ).get(): 179 _add_result( 180 measure_results, measure_name, varying_value, seed, measure_value 181 ) 182 else: 183 for seed in range(hap.n_seeds_mdp): 184 for measure in hap.hardness_measures: 185 for varying_value in hap.varying_params_values: 186 out = compute_hardness_measure_for_varying_prm( 187 hap.mdp_class, 188 hap.fixed_params, 189 hap.varying_params_name, 190 varying_value, 191 seed, 192 measure, 193 force_single_core=True, 194 retrieve_from_cache=hap.retrieve_from_cache, 195 return_n_states=hap.varying_params_name == "size", 196 ) 197 if hap.varying_params_name == "size": 198 measure_name, measure_value, n_states = out 199 else: 200 measure_name, measure_value = out 201 202 _add_result( 203 measure_results, 204 measure_name, 205 varying_value 206 if hap.varying_params_name != "size" 207 else n_states, 208 seed, 209 measure_value, 210 ) 211 212 for measure_name in measure_results: 213 # Create a Pandas DataFrame 214 df = pd.DataFrame.from_dict(measure_results[measure_name]) 215 216 # Normalize the values if required 217 if normalize_measures: 218 min_value = df.loc[:, measure_name].min() 219 max_value = df.loc[:, measure_name].max() 220 if max_value > min_value + 1e-4: 221 df.loc[:, measure_name] = (df.loc[:, measure_name] - min_value) / ( 222 max_value - min_value 223 ) 224 else: 225 df.loc[:, measure_name] = 0.5 226 227 # Clean the varying parameter name 228 df = df.rename(columns={"Varying value": hap.clean_varying_prm_name}).set_index( 229 hap.clean_varying_prm_name 230 ) 231 232 measure_results[measure_name] = df 233 234 return measure_results
computes the hardness measures for the scenarios and stored them in a pd.DataFrame
s.
Parameters
- hap (HardnessAnalysisParams): The hardness analysis scenario to run.
- normalize_measures (bool): If True, the values of the hardness measures are normalized.
Returns
- Dict[str, pd.DataFrame]: A dictionary that associated the name of a hardness measure to its corresponding pd.DataFrame.
260def compute_hardness_measure_for_varying_prm( 261 mdp_class: Type["BaseMDP"], 262 fixed_params: Dict[str, Any], 263 varying_params_name: str, 264 varying_value: Any, 265 seed: int, 266 measure: Union[str, Callable[[BaseMDP], float]], 267 force_single_core: bool = False, 268 retrieve_from_cache: bool = True, 269 folder: str = None, 270 return_n_states: bool = False, 271) -> Tuple[str, float, int]: 272 """ 273 computes the hardness measure for varying values of the parameter. 274 275 Parameters 276 ---------- 277 mdp_class : Type["BaseMDP"] 278 The MDP class for which the measures will be computed. 279 fixed_params : Dict[str, Any] 280 The parameters of the MDP that are being kept fixed. 281 varying_params_name : str 282 The name of the varying parameter. 283 varying_value : Any 284 The value of the parameter that is varying. 285 seed : int 286 The random seed. 287 measure : Union[str, Callable[[BaseMDP], float]] 288 The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given 289 as a string, it will be looked for in the ones available in the package 290 force_single_core : bool 291 If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when 292 the measure is given as a function. By default, single processing is not enforced. 293 retrieve_from_cache : bool 294 If True, the function will look for cached values of the measure. Note that this also hold if the measure is 295 given as a function. 296 folder : str 297 The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`. 298 return_n_states : bool 299 If True, the number of states is returned. 300 301 Returns 302 ------- 303 str 304 The nicely formatted name of the measure. 305 float 306 The value of the measure. 307 int, optional 308 The number of states. 309 """ 310 311 # Instantiate the MDP parameters 312 mdp_kwargs = deepcopy(fixed_params) 313 mdp_kwargs["seed"] = seed 314 mdp_kwargs[varying_params_name] = varying_value 315 316 return compute_hardness_measure( 317 mdp_class, 318 mdp_kwargs, 319 measure, 320 force_single_core, 321 retrieve_from_cache, 322 folder, 323 True, 324 return_n_states, 325 )
computes the hardness measure for varying values of the parameter.
Parameters
- mdp_class (Type["BaseMDP"]): The MDP class for which the measures will be computed.
- fixed_params (Dict[str, Any]): The parameters of the MDP that are being kept fixed.
- varying_params_name (str): The name of the varying parameter.
- varying_value (Any): The value of the parameter that is varying.
- seed (int): The random seed.
- measure (Union[str, Callable[[BaseMDP], float]]): The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given as a string, it will be looked for in the ones available in the package
- force_single_core (bool): If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when the measure is given as a function. By default, single processing is not enforced.
- retrieve_from_cache (bool): If True, the function will look for cached values of the measure. Note that this also hold if the measure is given as a function.
- folder (str):
The folder where cached values are looked for. By default, it is the
config.get_hardness_measures_cache_folder()
. - return_n_states (bool): If True, the number of states is returned.
Returns
- str: The nicely formatted name of the measure.
- float: The value of the measure.
- int, optional: The number of states.
328def compute_hardness_measure( 329 mdp_class: Type["BaseMDP"], 330 mdp_params: Dict[str, Any], 331 measure: Union[str, Callable[[BaseMDP], float]], 332 force_single_core: bool = False, 333 retrieve_from_cache: bool = True, 334 folder: str = None, 335 return_measure_name: bool = False, 336 return_n_states: bool = False, 337) -> Union[float, Tuple[str, float], Tuple[float, int], Tuple[str, float, int]]: 338 """ 339 340 Parameters 341 ---------- 342 mdp_class : Type["BaseMDP"] 343 The MDP class for which the measures will be computed. 344 mdp_params : Dict[str, Any] 345 The parameters for the MDP. 346 measure : Union[str, Callable[[BaseMDP], float]] 347 The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given 348 as a string, it will be looked for in the ones available in the package 349 force_single_core : bool 350 If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when 351 the measure is given as a function. By default, single processing is not enforced. 352 retrieve_from_cache : bool 353 If True, the function will look for cached values of the measure. Note that this also hold if the measure is 354 given as a function. 355 folder : str 356 The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`. 357 return_measure_name : bool 358 If True, a nicely formatted name for the measure is returned. 359 return_n_states : bool 360 If True, the number of states is returned. 361 362 Returns 363 ------- 364 str, optional 365 The nicely formatted name of the measure. 366 float 367 The value of the measure. 368 int, optional 369 The number of states. 370 """ 371 372 # Obtain the name of the measure and the function to compute it 373 measure_name, measure_f = _process_measure(measure) 374 375 # Check if the measure has already been computed 376 if retrieve_from_cache: 377 mdp_shell = mdp_class( 378 **mdp_params, instantiate_mdp=False, exclude_horizon_from_parameters=True 379 ) 380 if folder is None: 381 folder = ( 382 config.get_hardness_measures_cache_folder() 383 + mdp_class.__name__ 384 + os.sep 385 ) 386 387 measure_file_path = f"{folder}{measure_name}_{mdp_shell.hash}.txt" 388 if os.path.isfile(measure_file_path): 389 with open(measure_file_path, "r") as f: 390 measure_value = float(f.read()) 391 392 out = [measure_value] 393 if return_measure_name: 394 out.insert(0, measure_name) 395 if return_n_states: 396 mdp_shell.instantiate_MDP() 397 out.append(mdp_shell.n_states) 398 return out 399 400 # Possible forcing the computation to avoid using multiple cores 401 if force_single_core and config.get_available_cores() > 1: 402 available_cores = config.get_available_cores() 403 config.disable_multiprocessing() 404 mdp = mdp_class(**mdp_params) 405 measure_value = measure_f(mdp) 406 config.set_available_cores(available_cores) 407 else: 408 mdp = mdp_class(**mdp_params) 409 measure_value = measure_f(mdp) 410 411 # Caching the value of the measure (only in case we were supposed to look for it in the first place) 412 if retrieve_from_cache: 413 os.makedirs(os.path.dirname(measure_file_path), exist_ok=True) 414 with open(measure_file_path, "w") as f: 415 f.write(str(measure_value)) 416 417 out = [measure_value] 418 if return_measure_name: 419 out.insert(0, measure_name) 420 if return_n_states: 421 out.append(mdp.n_states) 422 return out
Parameters
- mdp_class (Type["BaseMDP"]): The MDP class for which the measures will be computed.
- mdp_params (Dict[str, Any]): The parameters for the MDP.
- measure (Union[str, Callable[[BaseMDP], float]]): The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given as a string, it will be looked for in the ones available in the package
- force_single_core (bool): If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when the measure is given as a function. By default, single processing is not enforced.
- retrieve_from_cache (bool): If True, the function will look for cached values of the measure. Note that this also hold if the measure is given as a function.
- folder (str):
The folder where cached values are looked for. By default, it is the
config.get_hardness_measures_cache_folder()
. - return_measure_name (bool): If True, a nicely formatted name for the measure is returned.
- return_n_states (bool): If True, the number of states is returned.
Returns
- str, optional: The nicely formatted name of the measure.
- float: The value of the measure.
- int, optional: The number of states.