colosseum.experiment.agent_mdp_interaction
1import io 2from time import time 3from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple, Union, Set 4 5import numpy as np 6import pandas as pd 7import seaborn as sns 8import toolz 9import tqdm 10from matplotlib import pyplot as plt 11from tqdm import trange 12from wrapt_timeout_decorator import timeout 13 14from colosseum import config 15from colosseum.config import process_debug_output 16from colosseum.emission_maps import Tabular 17from colosseum.experiment.indicators import ( 18 get_episodic_regrets_and_average_reward_at_time_zero, 19) 20from colosseum.mdp.utils.markov_chain import get_average_reward 21from colosseum.utils.acme import InMemoryLogger 22from colosseum.utils.acme.base_logger import Logger 23from colosseum.utils.formatter import clear_agent_mdp_class_name 24 25if TYPE_CHECKING: 26 from colosseum.mdp import ContinuousMDP, EpisodicMDP, BaseMDP 27 from colosseum.agent.agents.base import BaseAgent 28 29sns.set_theme() 30 31 32class MDPLoop: 33 """ 34 The `MDPLoop` is the object in charge of the agent/MDP interactions and the computation of the performance indicators. 35 It also provides limited plotting functionalities. 36 """ 37 38 @staticmethod 39 def get_indicators() -> List[str]: 40 """ 41 Returns 42 ------- 43 List[str] 44 The code names for the indicators that are computed by the MDPLoop. 45 """ 46 return [ 47 "cumulative_expected_reward", 48 "cumulative_regret", 49 "cumulative_reward", 50 "normalized_cumulative_expected_reward", 51 "normalized_cumulative_regret", 52 "normalized_cumulative_reward", 53 "steps_per_second", 54 ] 55 56 @staticmethod 57 def get_baseline_indicators() -> List[str]: 58 """ 59 Returns 60 ------- 61 List[str] 62 The code names for the baseline indicators that are computed by the MDPLoop. 63 """ 64 return [ 65 "random_cumulative_regret", 66 "random_cumulative_expected_reward", 67 "random_normalized_cumulative_regret", 68 "random_normalized_cumulative_expected_reward", 69 "optimal_cumulative_expected_reward", 70 "optimal_normalized_cumulative_expected_reward", 71 "worst_cumulative_regret", 72 "worst_cumulative_expected_reward", 73 "worst_normalized_cumulative_regret", 74 "worst_normalized_cumulative_expected_reward", 75 ] 76 77 @staticmethod 78 def get_baselines() -> Set[str]: 79 """ 80 Returns 81 ------- 82 Set[str] 83 The baselines available for comparison. 84 """ 85 return set(b[: b.find("_")] for b in MDPLoop.get_baseline_indicators()) 86 87 @staticmethod 88 def get_baselines_color_dict() -> Dict[str, str]: 89 """ 90 Returns 91 ------- 92 Dict[str, str] 93 The color associated by default to the baselines. 94 """ 95 return dict(random="black", worst="crimson", optimal="gold") 96 97 @staticmethod 98 def get_baselines_style_dict(): 99 """ 100 Returns 101 ------- 102 Dict[str, str] 103 The line style associated by default to the baselines. 104 """ 105 return dict(random=(0, (6, 12)), worst=(9, (6, 12)), optimal=(0, (6, 12))) 106 107 def __init__( 108 self, 109 mdp: Union["BaseMDP", "EpisodicMDP", "ContinuousMDP"], 110 agent: "BaseAgent", 111 logger: Logger = None, 112 n_log_intervals_to_check_for_agent_optimality: int = 10, 113 enforce_time_constraint: bool = True, 114 ) -> object: 115 """ 116 Parameters 117 ---------- 118 mdp: Union["EpisodicMDP", "ContinuousMDP"] 119 The MDP. 120 agent : BaseAgent 121 The agent. 122 logger : Logger 123 The logger where the results of the interaction between the agent and the MDP are stored. By default, the 124 `InMemoryLogger` is used. 125 n_log_intervals_to_check_for_agent_optimality : int 126 The length of the interval between check is the policy has reached optimality. By default, the check happens 127 every ten interactions. 128 enforce_time_constraint : bool 129 If True, the computational time constraint given in the `run` function is enforced through multithreading. 130 By default, it is enforced. 131 """ 132 133 if logger is None: 134 logger = InMemoryLogger() 135 136 self.logger = logger 137 self._enforce_time_constraint = enforce_time_constraint 138 self._mdp = mdp 139 self._agent = agent 140 self._episodic = self._mdp.is_episodic() 141 self._n_steps_to_check_for_agent_optimality = ( 142 n_log_intervals_to_check_for_agent_optimality 143 ) 144 assert self._episodic == agent.is_episodic() 145 assert self._agent.is_emission_map_accepted( 146 Tabular if self._mdp.emission_map is None else self._mdp.emission_map 147 ) 148 self.actions_sequence = [] 149 150 @property 151 def remaining_time(self) -> float: 152 """ 153 Returns 154 ------- 155 float 156 The remaining computational time for training the agent. 157 """ 158 return self._max_time - (time() - self._mdp_loop_timer) 159 160 def _limit_update_time(self, t, f): 161 try: 162 if self.remaining_time < 0.5: 163 raise TimeoutError() 164 timeout(self.remaining_time)(f)() 165 except TimeoutError or SystemError: 166 if config._DEBUG_LEVEL > 0: 167 print("Time exceeded with function ", f) 168 self._limit_exceeded(t) 169 170 def _limit_exceeded(self, t): 171 self._is_training = False 172 self._last_training_step = t 173 if config._DEBUG_LEVEL > 0: 174 do = f"Stopped training at {time() - self._mdp_loop_timer:.2f}" 175 process_debug_output(do) 176 if self._verbose: 177 self._verbose_postfix["is_training"] = f"No, time exhausted at {t}" 178 179 def run( 180 self, 181 T: int, 182 log_every: int = -1, 183 max_time: float = np.inf, 184 ) -> Tuple[int, Dict[str, float]]: 185 """ 186 runs the agent/MDP interactions. 187 188 Parameters 189 ---------- 190 T : int 191 The number of total interactions between the agent and the MDP. 192 log_every : int 193 The number of time steps after which performance indicators are calculated. By default, it does not calculate 194 them at any time except at the last one. 195 max_time : float 196 The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted. 197 By default, the maximum given time is infinite. 198 199 Returns 200 ---------- 201 int 202 The time step at which the training has been interrupted due to the time constraint. If the constraint has 203 been respected it returns -1. 204 Dict[str, float] 205 The performance indicators computed at the end of the interactions. 206 """ 207 208 if max_time == np.inf: 209 enforce_time_constraint = False 210 else: 211 enforce_time_constraint = self._enforce_time_constraint 212 213 assert ( 214 type(log_every) == int 215 ), f"The log_every variable should be an integer, received value: {log_every}." 216 log_every = -1 if log_every == 0 else log_every 217 218 # Reset the visitation count of the MDP 219 self._mdp.reset_visitation_counts() 220 221 self._reset_run_variables() 222 self._max_time = max_time 223 224 ts = self._mdp.reset() 225 first_before_new_episode_timer = time() 226 if enforce_time_constraint and self.remaining_time < np.inf: 227 self._limit_update_time(0, self._agent.before_start_interacting) 228 else: 229 self._agent.before_start_interacting() 230 if config._DEBUG_LEVEL > 0: 231 if self._is_training: 232 do = f"before_start_interacting completed in {time() - first_before_new_episode_timer:.2f}." 233 else: 234 do = "before_start_interacting exceeded the time limit." 235 process_debug_output(do) 236 237 self._set_loop(T) 238 for t in self._loop: 239 if self._is_training and self.remaining_time < 0.5: 240 self._limit_exceeded(t) 241 242 # MDP step 243 h = self._mdp.h 244 action = self._agent.select_action(ts, h) 245 new_ts = self._mdp.step(action) 246 self.actions_sequence.append(new_ts.reward) 247 248 # Single step agent update 249 if self._is_training: 250 if enforce_time_constraint and self.remaining_time < np.inf: 251 self._limit_update_time( 252 t, 253 lambda: self._agent.step_update(ts, action, new_ts, h), 254 ) 255 else: 256 self._agent.step_update(ts, action, new_ts, h) 257 258 # End of episode agent update 259 if self._is_training and self._agent.is_episode_end(ts, action, new_ts, h): 260 if enforce_time_constraint and self.remaining_time < np.inf: 261 self._limit_update_time(t, self._agent.episode_end_update) 262 else: 263 self._agent.episode_end_update() 264 265 if t > 0 and log_every > 0 and t % log_every == 0: 266 # Log the performance of the agent 267 self._update_performance_logs(t) 268 self._n_steps_since_last_log = 0 269 270 # User defined custom log 271 self._agent.agent_logs() 272 273 # Verbose loggings 274 self._update_user_loggings(t) 275 276 # Storing the latest regrets 277 self._latest_expected_regrets.append(self._normalized_regret) 278 if ( 279 len(self._latest_expected_regrets) 280 > self._n_steps_to_check_for_agent_optimality 281 ): 282 self._latest_expected_regrets.pop(0) 283 284 # Stop training if the agent has confidently reached the optimal policy 285 if self._is_training and t > 0.2 * T and self._is_policy_optimal(): 286 if type(self._loop) == tqdm.std.tqdm: 287 self._verbose_postfix["is_training"] = f"No, optimal at {t}" 288 self._is_training = False 289 290 self._n_steps_since_last_log += 1 291 self._cumulative_reward += new_ts.reward 292 ts = new_ts 293 294 # Resetting episodic MDPs 295 if self._mdp.is_episodic() and new_ts.last(): 296 assert self._mdp.necessary_reset or t == T - 2 297 ts = self._mdp.reset() 298 self._n_episodes += 1 299 300 self._update_performance_logs(t) 301 self.logger.close() 302 return self._last_training_step, self._last_logs 303 304 def _reset_run_variables(self): 305 self._cumulative_reward = 0.0 306 self._cumulative_regret = 0.0 307 self._normalized_cumulative_regret = 0.0 308 self._random_cumulative_expected_reward = 0.0 309 self._random_cumulative_regret = 0.0 310 self._normalized_random_cumulative_regret = 0.0 311 self._cumulative_expected_reward_agent = 0.0 312 313 self._verbose = False 314 self._verbose_postfix = dict(is_training="True") 315 self._is_training = True 316 self._n_steps_since_last_log = 0 317 self._last_training_step = -1 318 self._n_episodes = 0 319 self._last_logs = None 320 self._past_logs = None 321 self._cached_episodic_regrets = None 322 self._cached_continuous_regrets = None 323 self._latest_expected_regrets = [] 324 325 # Cache the regret for the random agent 326 if self._episodic: 327 328 # Random agent regret 329 self._episodic_regret_random_agent = ( 330 self._mdp.episodic_optimal_average_reward 331 - self._mdp.episodic_random_average_reward 332 ) 333 self._episodic_normalized_regret_random_agent = ( 334 self._episodic_regret_random_agent 335 / ( 336 self._mdp.episodic_optimal_average_reward 337 - self._mdp.episodic_worst_average_reward 338 ) 339 ) 340 341 # Worst agent regret 342 self._episodic_regret_worst_agent = ( 343 self._mdp.episodic_optimal_average_reward 344 - self._mdp.episodic_worst_average_reward 345 ) 346 self._episodic_normalized_regret_worst_agent = ( 347 self._episodic_regret_worst_agent 348 / ( 349 self._mdp.episodic_optimal_average_reward 350 - self._mdp.episodic_worst_average_reward 351 ) 352 ) 353 354 # Reward normalized 355 self._cumulative_reward_normalizer = lambda t, cr: ( 356 cr - t * self._mdp.episodic_worst_average_reward 357 ) / ( 358 self._mdp.episodic_optimal_average_reward 359 - self._mdp.episodic_worst_average_reward 360 ) 361 else: 362 363 # Random agent regret 364 self._regret_random_agent = ( 365 self._mdp.optimal_average_reward - self._mdp.random_average_reward 366 ) 367 self._normalized_regret_random_agent = self._regret_random_agent / ( 368 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 369 ) 370 371 # Worst agent regret 372 self._regret_worst_agent = ( 373 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 374 ) 375 self._normalized_regret_worst_agent = self._regret_worst_agent / ( 376 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 377 ) 378 379 assert ( 380 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 381 > 0.0002 382 ), type(self._mdp).__name__ + str(self._mdp.parameters) 383 384 self._cumulative_reward_normalizer = lambda t, cr: ( 385 cr - t * self._mdp.worst_average_reward 386 ) / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward) 387 388 self.logger.reset() 389 self._mdp_loop_timer = time() 390 self._verbose_time = time() 391 392 def _update_performance_logs(self, t: int): 393 self._compute_performance_indicators(t + 1) 394 395 self._last_logs = dict( 396 steps=t, 397 cumulative_regret=self._cumulative_regret, 398 cumulative_reward=self._cumulative_reward, 399 cumulative_expected_reward=self._cumulative_expected_reward_agent, 400 normalized_cumulative_regret=self._normalized_cumulative_regret, 401 normalized_cumulative_reward=self._cumulative_reward_normalizer( 402 t, self._cumulative_reward 403 ), 404 normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 405 t, self._cumulative_expected_reward_agent 406 ), 407 random_cumulative_regret=self._cumulative_regret_random_agent, 408 random_cumulative_expected_reward=self._cumulative_reward_random_agent, 409 random_normalized_cumulative_regret=self._normalized_cumulative_regret_random_agent, 410 random_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 411 t, self._cumulative_reward_random_agent 412 ), 413 worst_cumulative_regret=self._cumulative_regret_worst_agent, 414 worst_cumulative_expected_reward=self._cumulative_reward_worst_agent, 415 worst_normalized_cumulative_regret=self._normalized_cumulative_regret_worst_agent, 416 worst_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 417 t, self._cumulative_reward_worst_agent 418 ), 419 optimal_cumulative_expected_reward=self._cumulative_reward_optimal_agent, 420 optimal_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 421 t, self._cumulative_reward_optimal_agent 422 ), 423 steps_per_second=t / (time() - self._mdp_loop_timer), 424 ) 425 426 # Communicate the indicators to the logger with a maximum of five digits 427 a = toolz.valmap(lambda x: np.round(x, 5), self._last_logs) 428 self.logger.write(a) 429 430 def _compute_regrets(self): 431 if self._episodic: 432 return self._compute_episodic_regret() 433 return self._compute_continuous_regret() 434 435 def _compute_performance_indicators(self, t: int): 436 self._compute_regrets() 437 438 if self._episodic: 439 # Randon agent (regret) 440 self._cumulative_regret_random_agent = ( 441 self._episodic_regret_random_agent * t 442 ) 443 self._normalized_cumulative_regret_random_agent = ( 444 self._episodic_normalized_regret_random_agent * t 445 ) 446 447 # Worst agent (regret) 448 self._cumulative_regret_worst_agent = self._episodic_regret_worst_agent * t 449 self._normalized_cumulative_regret_worst_agent = ( 450 self._episodic_normalized_regret_worst_agent * t 451 ) 452 453 # Random agent (reward) 454 self._cumulative_reward_random_agent = ( 455 self._mdp.episodic_random_average_reward * t 456 ) 457 458 # Worst agent (reward) 459 self._cumulative_reward_worst_agent = ( 460 self._mdp.episodic_worst_average_reward * t 461 ) 462 463 # Optimal agent (reward) 464 self._cumulative_reward_optimal_agent = ( 465 self._mdp.episodic_optimal_average_reward * t 466 ) 467 468 else: 469 # Randon agent (regret) 470 self._cumulative_regret_random_agent = self._regret_random_agent * t 471 self._normalized_cumulative_regret_random_agent = ( 472 self._normalized_regret_random_agent * t 473 ) 474 475 # Worst agent (regret) 476 self._cumulative_regret_worst_agent = self._regret_worst_agent * t 477 self._normalized_cumulative_regret_worst_agent = ( 478 self._normalized_regret_worst_agent * t 479 ) 480 481 # Random agent (reward) 482 self._cumulative_reward_random_agent = self._mdp.random_average_reward * t 483 484 # Worst agent (reward) 485 self._cumulative_reward_worst_agent = self._mdp.worst_average_reward * t 486 487 # Optimal agent (reward) 488 self._cumulative_reward_optimal_agent = self._mdp.optimal_average_reward * t 489 490 # Avoid numerical errors that lead to negative rewards 491 assert ( 492 self._regret >= 0.0 493 ), f"{self._regret} on {type(self._mdp).__name__} {self._mdp.parameters} for policy {self._agent.current_optimal_stochastic_policy}" 494 assert self._normalized_regret >= 0.0, self._normalized_regret 495 496 self._cumulative_regret += self._regret * self._n_steps_since_last_log 497 self._normalized_cumulative_regret += ( 498 self._normalized_regret * self._n_steps_since_last_log 499 ) 500 self._cumulative_expected_reward_agent += ( 501 self._agent_average_reward * self._n_steps_since_last_log 502 ) 503 504 @property 505 def _agent_average_reward(self): 506 if self._episodic: 507 return self._episodic_agent_average_reward / self._mdp.H 508 return self._agent_continuous_average_reward 509 510 def _compute_continuous_regret(self): 511 if not self._is_training: 512 if self._cached_continuous_regrets is None: 513 self._cached_continuous_regrets = self._get_continuous_regrets() 514 self._regret, self._normalized_regret = self._cached_continuous_regrets 515 else: 516 self._regret, self._normalized_regret = self._get_continuous_regrets() 517 518 def _get_continuous_regrets(self): 519 self._agent_continuous_average_reward = get_average_reward( 520 self._mdp.T, 521 self._mdp.R, 522 self._agent.current_optimal_stochastic_policy, 523 [(self._mdp.node_to_index[self._mdp.cur_node], 1.0)], 524 ) 525 526 r = self._mdp.optimal_average_reward - self._agent_continuous_average_reward 527 if np.isclose(r, 0.0, atol=1e-3): 528 r = 0.0 529 if r < 0: 530 r = 0 531 nr = r / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward) 532 return r, nr 533 534 def _compute_episodic_regret(self): 535 if not self._is_training: 536 # If the agent is not training, the policy will not change we can cache and reuse the regret for each given 537 # starting state. 538 if self._cached_episodic_regrets is None: 539 Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero( 540 self._mdp.H, 541 self._mdp.T, 542 self._mdp.R, 543 self._agent.current_optimal_stochastic_policy, 544 self._mdp.starting_state_distribution, 545 self._mdp.optimal_value_functions[1], 546 ) 547 self._episodic_agent_average_reward = epi_agent_ar 548 self._cached_episodic_regrets = { 549 n: ( 550 Rs[self._mdp.node_to_index[n]] / self._mdp.H, # expected regret 551 Rs[self._mdp.node_to_index[n]] # normalized expected regret 552 / self._mdp.get_minimal_regret_for_starting_node(n), 553 ) 554 for n in self._mdp.starting_nodes 555 } 556 self._regret, self._normalized_regret = self._cached_episodic_regrets[ 557 self._mdp.last_starting_node 558 ] 559 else: 560 Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero( 561 self._mdp.H, 562 self._mdp.T, 563 self._mdp.R, 564 self._agent.current_optimal_stochastic_policy, 565 self._mdp.starting_state_distribution, 566 self._mdp.optimal_value_functions[1], 567 ) 568 self._episodic_agent_average_reward = epi_agent_ar 569 self._regret = ( 570 Rs[self._mdp.node_to_index[self._mdp.last_starting_node]] / self._mdp.H 571 ) 572 self._normalized_regret = ( 573 self._regret 574 / self._mdp.get_minimal_regret_for_starting_node( 575 self._mdp.last_starting_node 576 ) 577 * self._mdp.H 578 ) 579 580 def _is_policy_optimal(self) -> bool: 581 if ( 582 len(self._latest_expected_regrets) 583 == self._n_steps_to_check_for_agent_optimality 584 and np.isclose( 585 0, 586 self._latest_expected_regrets, 587 atol=1e-4 if self._mdp.is_episodic() else 1e-5, 588 ).all() 589 ): 590 # After we get an empirical suggestions that the policy may be optimal, we check if the expected regret is 591 # zero as well 592 self._compute_regrets() 593 return np.isclose(self._normalized_regret, 0).all() 594 return False 595 596 def _set_loop(self, T: int) -> Iterable: 597 """ 598 creates a loop lasting for T steps taking into account the verbosity level. 599 """ 600 if config.VERBOSE_LEVEL != 0: 601 desc = f"Experiment loop {type(self._agent).__name__}@{type(self._mdp).__name__}" 602 if type(config.VERBOSE_LEVEL) == str: 603 self.s = io.StringIO() # we need this reference 604 self._loop = trange(T, desc=desc, file=self.s, mininterval=5) 605 else: 606 self._loop = trange(T, desc=desc, mininterval=5) 607 self._verbose = True 608 else: 609 self._loop = range(T) 610 611 def _update_user_loggings(self, t: int): 612 if self._verbose: # and time() - self._verbose_time > 5: 613 self._verbose_postfix["Instantaneous normalized regret"] = np.round( 614 self._normalized_regret / t, 8 615 ) 616 self._loop.set_postfix(self._verbose_postfix, refresh=False) 617 618 def plot( 619 self, 620 indicator: str = "cumulative_regret", 621 ax=None, 622 baselines=("random", "worst", "optimal"), 623 label=None, 624 ): 625 """ 626 plots the values of the indicator obtained by the agent during the interactions along with the baseline values. 627 628 Parameters 629 ---------- 630 indicator : str 631 The code name of the performance indicator that will be shown in the plot. Check `MDPLoop.get_indicators()` 632 to get a list of the available indicators. By default, the 'cumulative_regret' is shown. 633 ax : plt.Axes 634 The ax object where the plot will be put. By default, a new axis is created. 635 baselines : List[str] 636 The baselines to be included in the plot. Check `MDPLoop.get_baselines()` to get a list of the available 637 baselines. By default, all baselines are shown. 638 label : str 639 The label to be given to the agent. By default, a cleaned version of the agent class name is used. 640 """ 641 642 show = ax is None 643 if ax is None: 644 fig, ax = plt.subplots() 645 646 assert indicator in self.get_indicators(), ( 647 f"{indicator} is not an indicator. The indicators available are: " 648 + ",".join(self.get_indicators()) 649 + "." 650 ) 651 652 df_e = pd.DataFrame(self.logger.data) 653 time_steps = [0] + df_e.loc[:, "steps"].tolist() 654 ax.plot( 655 time_steps[1:] if indicator == "steps_per_second" else time_steps, 656 ([] if indicator == "steps_per_second" else [0]) 657 + df_e.loc[:, indicator].tolist(), 658 label=clear_agent_mdp_class_name(type(self._agent).__name__) 659 if label is None 660 else label, 661 ) 662 ax.set_ylabel(indicator.replace("_", " ").capitalize()) 663 664 for b in baselines: 665 indicator = indicator.replace( 666 "cumulative_reward", "cumulative_expected_reward" 667 ) 668 if b + "_" + indicator in self.get_baseline_indicators(): 669 ax.plot( 670 time_steps, 671 [0] + df_e.loc[:, b + "_" + indicator].tolist(), 672 label=b.capitalize(), 673 # alpha=0.9, 674 linestyle=(0, (5, 10)), 675 color="darkolivegreen" 676 if "optimal" in b 677 else ("darkred" if "worst" in b else "darkslategray"), 678 linewidth=2, 679 ) 680 681 ax.set_xlabel("time step") 682 ax.legend() 683 if show: 684 plt.tight_layout() 685 plt.show()
class
MDPLoop:
33class MDPLoop: 34 """ 35 The `MDPLoop` is the object in charge of the agent/MDP interactions and the computation of the performance indicators. 36 It also provides limited plotting functionalities. 37 """ 38 39 @staticmethod 40 def get_indicators() -> List[str]: 41 """ 42 Returns 43 ------- 44 List[str] 45 The code names for the indicators that are computed by the MDPLoop. 46 """ 47 return [ 48 "cumulative_expected_reward", 49 "cumulative_regret", 50 "cumulative_reward", 51 "normalized_cumulative_expected_reward", 52 "normalized_cumulative_regret", 53 "normalized_cumulative_reward", 54 "steps_per_second", 55 ] 56 57 @staticmethod 58 def get_baseline_indicators() -> List[str]: 59 """ 60 Returns 61 ------- 62 List[str] 63 The code names for the baseline indicators that are computed by the MDPLoop. 64 """ 65 return [ 66 "random_cumulative_regret", 67 "random_cumulative_expected_reward", 68 "random_normalized_cumulative_regret", 69 "random_normalized_cumulative_expected_reward", 70 "optimal_cumulative_expected_reward", 71 "optimal_normalized_cumulative_expected_reward", 72 "worst_cumulative_regret", 73 "worst_cumulative_expected_reward", 74 "worst_normalized_cumulative_regret", 75 "worst_normalized_cumulative_expected_reward", 76 ] 77 78 @staticmethod 79 def get_baselines() -> Set[str]: 80 """ 81 Returns 82 ------- 83 Set[str] 84 The baselines available for comparison. 85 """ 86 return set(b[: b.find("_")] for b in MDPLoop.get_baseline_indicators()) 87 88 @staticmethod 89 def get_baselines_color_dict() -> Dict[str, str]: 90 """ 91 Returns 92 ------- 93 Dict[str, str] 94 The color associated by default to the baselines. 95 """ 96 return dict(random="black", worst="crimson", optimal="gold") 97 98 @staticmethod 99 def get_baselines_style_dict(): 100 """ 101 Returns 102 ------- 103 Dict[str, str] 104 The line style associated by default to the baselines. 105 """ 106 return dict(random=(0, (6, 12)), worst=(9, (6, 12)), optimal=(0, (6, 12))) 107 108 def __init__( 109 self, 110 mdp: Union["BaseMDP", "EpisodicMDP", "ContinuousMDP"], 111 agent: "BaseAgent", 112 logger: Logger = None, 113 n_log_intervals_to_check_for_agent_optimality: int = 10, 114 enforce_time_constraint: bool = True, 115 ) -> object: 116 """ 117 Parameters 118 ---------- 119 mdp: Union["EpisodicMDP", "ContinuousMDP"] 120 The MDP. 121 agent : BaseAgent 122 The agent. 123 logger : Logger 124 The logger where the results of the interaction between the agent and the MDP are stored. By default, the 125 `InMemoryLogger` is used. 126 n_log_intervals_to_check_for_agent_optimality : int 127 The length of the interval between check is the policy has reached optimality. By default, the check happens 128 every ten interactions. 129 enforce_time_constraint : bool 130 If True, the computational time constraint given in the `run` function is enforced through multithreading. 131 By default, it is enforced. 132 """ 133 134 if logger is None: 135 logger = InMemoryLogger() 136 137 self.logger = logger 138 self._enforce_time_constraint = enforce_time_constraint 139 self._mdp = mdp 140 self._agent = agent 141 self._episodic = self._mdp.is_episodic() 142 self._n_steps_to_check_for_agent_optimality = ( 143 n_log_intervals_to_check_for_agent_optimality 144 ) 145 assert self._episodic == agent.is_episodic() 146 assert self._agent.is_emission_map_accepted( 147 Tabular if self._mdp.emission_map is None else self._mdp.emission_map 148 ) 149 self.actions_sequence = [] 150 151 @property 152 def remaining_time(self) -> float: 153 """ 154 Returns 155 ------- 156 float 157 The remaining computational time for training the agent. 158 """ 159 return self._max_time - (time() - self._mdp_loop_timer) 160 161 def _limit_update_time(self, t, f): 162 try: 163 if self.remaining_time < 0.5: 164 raise TimeoutError() 165 timeout(self.remaining_time)(f)() 166 except TimeoutError or SystemError: 167 if config._DEBUG_LEVEL > 0: 168 print("Time exceeded with function ", f) 169 self._limit_exceeded(t) 170 171 def _limit_exceeded(self, t): 172 self._is_training = False 173 self._last_training_step = t 174 if config._DEBUG_LEVEL > 0: 175 do = f"Stopped training at {time() - self._mdp_loop_timer:.2f}" 176 process_debug_output(do) 177 if self._verbose: 178 self._verbose_postfix["is_training"] = f"No, time exhausted at {t}" 179 180 def run( 181 self, 182 T: int, 183 log_every: int = -1, 184 max_time: float = np.inf, 185 ) -> Tuple[int, Dict[str, float]]: 186 """ 187 runs the agent/MDP interactions. 188 189 Parameters 190 ---------- 191 T : int 192 The number of total interactions between the agent and the MDP. 193 log_every : int 194 The number of time steps after which performance indicators are calculated. By default, it does not calculate 195 them at any time except at the last one. 196 max_time : float 197 The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted. 198 By default, the maximum given time is infinite. 199 200 Returns 201 ---------- 202 int 203 The time step at which the training has been interrupted due to the time constraint. If the constraint has 204 been respected it returns -1. 205 Dict[str, float] 206 The performance indicators computed at the end of the interactions. 207 """ 208 209 if max_time == np.inf: 210 enforce_time_constraint = False 211 else: 212 enforce_time_constraint = self._enforce_time_constraint 213 214 assert ( 215 type(log_every) == int 216 ), f"The log_every variable should be an integer, received value: {log_every}." 217 log_every = -1 if log_every == 0 else log_every 218 219 # Reset the visitation count of the MDP 220 self._mdp.reset_visitation_counts() 221 222 self._reset_run_variables() 223 self._max_time = max_time 224 225 ts = self._mdp.reset() 226 first_before_new_episode_timer = time() 227 if enforce_time_constraint and self.remaining_time < np.inf: 228 self._limit_update_time(0, self._agent.before_start_interacting) 229 else: 230 self._agent.before_start_interacting() 231 if config._DEBUG_LEVEL > 0: 232 if self._is_training: 233 do = f"before_start_interacting completed in {time() - first_before_new_episode_timer:.2f}." 234 else: 235 do = "before_start_interacting exceeded the time limit." 236 process_debug_output(do) 237 238 self._set_loop(T) 239 for t in self._loop: 240 if self._is_training and self.remaining_time < 0.5: 241 self._limit_exceeded(t) 242 243 # MDP step 244 h = self._mdp.h 245 action = self._agent.select_action(ts, h) 246 new_ts = self._mdp.step(action) 247 self.actions_sequence.append(new_ts.reward) 248 249 # Single step agent update 250 if self._is_training: 251 if enforce_time_constraint and self.remaining_time < np.inf: 252 self._limit_update_time( 253 t, 254 lambda: self._agent.step_update(ts, action, new_ts, h), 255 ) 256 else: 257 self._agent.step_update(ts, action, new_ts, h) 258 259 # End of episode agent update 260 if self._is_training and self._agent.is_episode_end(ts, action, new_ts, h): 261 if enforce_time_constraint and self.remaining_time < np.inf: 262 self._limit_update_time(t, self._agent.episode_end_update) 263 else: 264 self._agent.episode_end_update() 265 266 if t > 0 and log_every > 0 and t % log_every == 0: 267 # Log the performance of the agent 268 self._update_performance_logs(t) 269 self._n_steps_since_last_log = 0 270 271 # User defined custom log 272 self._agent.agent_logs() 273 274 # Verbose loggings 275 self._update_user_loggings(t) 276 277 # Storing the latest regrets 278 self._latest_expected_regrets.append(self._normalized_regret) 279 if ( 280 len(self._latest_expected_regrets) 281 > self._n_steps_to_check_for_agent_optimality 282 ): 283 self._latest_expected_regrets.pop(0) 284 285 # Stop training if the agent has confidently reached the optimal policy 286 if self._is_training and t > 0.2 * T and self._is_policy_optimal(): 287 if type(self._loop) == tqdm.std.tqdm: 288 self._verbose_postfix["is_training"] = f"No, optimal at {t}" 289 self._is_training = False 290 291 self._n_steps_since_last_log += 1 292 self._cumulative_reward += new_ts.reward 293 ts = new_ts 294 295 # Resetting episodic MDPs 296 if self._mdp.is_episodic() and new_ts.last(): 297 assert self._mdp.necessary_reset or t == T - 2 298 ts = self._mdp.reset() 299 self._n_episodes += 1 300 301 self._update_performance_logs(t) 302 self.logger.close() 303 return self._last_training_step, self._last_logs 304 305 def _reset_run_variables(self): 306 self._cumulative_reward = 0.0 307 self._cumulative_regret = 0.0 308 self._normalized_cumulative_regret = 0.0 309 self._random_cumulative_expected_reward = 0.0 310 self._random_cumulative_regret = 0.0 311 self._normalized_random_cumulative_regret = 0.0 312 self._cumulative_expected_reward_agent = 0.0 313 314 self._verbose = False 315 self._verbose_postfix = dict(is_training="True") 316 self._is_training = True 317 self._n_steps_since_last_log = 0 318 self._last_training_step = -1 319 self._n_episodes = 0 320 self._last_logs = None 321 self._past_logs = None 322 self._cached_episodic_regrets = None 323 self._cached_continuous_regrets = None 324 self._latest_expected_regrets = [] 325 326 # Cache the regret for the random agent 327 if self._episodic: 328 329 # Random agent regret 330 self._episodic_regret_random_agent = ( 331 self._mdp.episodic_optimal_average_reward 332 - self._mdp.episodic_random_average_reward 333 ) 334 self._episodic_normalized_regret_random_agent = ( 335 self._episodic_regret_random_agent 336 / ( 337 self._mdp.episodic_optimal_average_reward 338 - self._mdp.episodic_worst_average_reward 339 ) 340 ) 341 342 # Worst agent regret 343 self._episodic_regret_worst_agent = ( 344 self._mdp.episodic_optimal_average_reward 345 - self._mdp.episodic_worst_average_reward 346 ) 347 self._episodic_normalized_regret_worst_agent = ( 348 self._episodic_regret_worst_agent 349 / ( 350 self._mdp.episodic_optimal_average_reward 351 - self._mdp.episodic_worst_average_reward 352 ) 353 ) 354 355 # Reward normalized 356 self._cumulative_reward_normalizer = lambda t, cr: ( 357 cr - t * self._mdp.episodic_worst_average_reward 358 ) / ( 359 self._mdp.episodic_optimal_average_reward 360 - self._mdp.episodic_worst_average_reward 361 ) 362 else: 363 364 # Random agent regret 365 self._regret_random_agent = ( 366 self._mdp.optimal_average_reward - self._mdp.random_average_reward 367 ) 368 self._normalized_regret_random_agent = self._regret_random_agent / ( 369 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 370 ) 371 372 # Worst agent regret 373 self._regret_worst_agent = ( 374 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 375 ) 376 self._normalized_regret_worst_agent = self._regret_worst_agent / ( 377 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 378 ) 379 380 assert ( 381 self._mdp.optimal_average_reward - self._mdp.worst_average_reward 382 > 0.0002 383 ), type(self._mdp).__name__ + str(self._mdp.parameters) 384 385 self._cumulative_reward_normalizer = lambda t, cr: ( 386 cr - t * self._mdp.worst_average_reward 387 ) / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward) 388 389 self.logger.reset() 390 self._mdp_loop_timer = time() 391 self._verbose_time = time() 392 393 def _update_performance_logs(self, t: int): 394 self._compute_performance_indicators(t + 1) 395 396 self._last_logs = dict( 397 steps=t, 398 cumulative_regret=self._cumulative_regret, 399 cumulative_reward=self._cumulative_reward, 400 cumulative_expected_reward=self._cumulative_expected_reward_agent, 401 normalized_cumulative_regret=self._normalized_cumulative_regret, 402 normalized_cumulative_reward=self._cumulative_reward_normalizer( 403 t, self._cumulative_reward 404 ), 405 normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 406 t, self._cumulative_expected_reward_agent 407 ), 408 random_cumulative_regret=self._cumulative_regret_random_agent, 409 random_cumulative_expected_reward=self._cumulative_reward_random_agent, 410 random_normalized_cumulative_regret=self._normalized_cumulative_regret_random_agent, 411 random_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 412 t, self._cumulative_reward_random_agent 413 ), 414 worst_cumulative_regret=self._cumulative_regret_worst_agent, 415 worst_cumulative_expected_reward=self._cumulative_reward_worst_agent, 416 worst_normalized_cumulative_regret=self._normalized_cumulative_regret_worst_agent, 417 worst_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 418 t, self._cumulative_reward_worst_agent 419 ), 420 optimal_cumulative_expected_reward=self._cumulative_reward_optimal_agent, 421 optimal_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer( 422 t, self._cumulative_reward_optimal_agent 423 ), 424 steps_per_second=t / (time() - self._mdp_loop_timer), 425 ) 426 427 # Communicate the indicators to the logger with a maximum of five digits 428 a = toolz.valmap(lambda x: np.round(x, 5), self._last_logs) 429 self.logger.write(a) 430 431 def _compute_regrets(self): 432 if self._episodic: 433 return self._compute_episodic_regret() 434 return self._compute_continuous_regret() 435 436 def _compute_performance_indicators(self, t: int): 437 self._compute_regrets() 438 439 if self._episodic: 440 # Randon agent (regret) 441 self._cumulative_regret_random_agent = ( 442 self._episodic_regret_random_agent * t 443 ) 444 self._normalized_cumulative_regret_random_agent = ( 445 self._episodic_normalized_regret_random_agent * t 446 ) 447 448 # Worst agent (regret) 449 self._cumulative_regret_worst_agent = self._episodic_regret_worst_agent * t 450 self._normalized_cumulative_regret_worst_agent = ( 451 self._episodic_normalized_regret_worst_agent * t 452 ) 453 454 # Random agent (reward) 455 self._cumulative_reward_random_agent = ( 456 self._mdp.episodic_random_average_reward * t 457 ) 458 459 # Worst agent (reward) 460 self._cumulative_reward_worst_agent = ( 461 self._mdp.episodic_worst_average_reward * t 462 ) 463 464 # Optimal agent (reward) 465 self._cumulative_reward_optimal_agent = ( 466 self._mdp.episodic_optimal_average_reward * t 467 ) 468 469 else: 470 # Randon agent (regret) 471 self._cumulative_regret_random_agent = self._regret_random_agent * t 472 self._normalized_cumulative_regret_random_agent = ( 473 self._normalized_regret_random_agent * t 474 ) 475 476 # Worst agent (regret) 477 self._cumulative_regret_worst_agent = self._regret_worst_agent * t 478 self._normalized_cumulative_regret_worst_agent = ( 479 self._normalized_regret_worst_agent * t 480 ) 481 482 # Random agent (reward) 483 self._cumulative_reward_random_agent = self._mdp.random_average_reward * t 484 485 # Worst agent (reward) 486 self._cumulative_reward_worst_agent = self._mdp.worst_average_reward * t 487 488 # Optimal agent (reward) 489 self._cumulative_reward_optimal_agent = self._mdp.optimal_average_reward * t 490 491 # Avoid numerical errors that lead to negative rewards 492 assert ( 493 self._regret >= 0.0 494 ), f"{self._regret} on {type(self._mdp).__name__} {self._mdp.parameters} for policy {self._agent.current_optimal_stochastic_policy}" 495 assert self._normalized_regret >= 0.0, self._normalized_regret 496 497 self._cumulative_regret += self._regret * self._n_steps_since_last_log 498 self._normalized_cumulative_regret += ( 499 self._normalized_regret * self._n_steps_since_last_log 500 ) 501 self._cumulative_expected_reward_agent += ( 502 self._agent_average_reward * self._n_steps_since_last_log 503 ) 504 505 @property 506 def _agent_average_reward(self): 507 if self._episodic: 508 return self._episodic_agent_average_reward / self._mdp.H 509 return self._agent_continuous_average_reward 510 511 def _compute_continuous_regret(self): 512 if not self._is_training: 513 if self._cached_continuous_regrets is None: 514 self._cached_continuous_regrets = self._get_continuous_regrets() 515 self._regret, self._normalized_regret = self._cached_continuous_regrets 516 else: 517 self._regret, self._normalized_regret = self._get_continuous_regrets() 518 519 def _get_continuous_regrets(self): 520 self._agent_continuous_average_reward = get_average_reward( 521 self._mdp.T, 522 self._mdp.R, 523 self._agent.current_optimal_stochastic_policy, 524 [(self._mdp.node_to_index[self._mdp.cur_node], 1.0)], 525 ) 526 527 r = self._mdp.optimal_average_reward - self._agent_continuous_average_reward 528 if np.isclose(r, 0.0, atol=1e-3): 529 r = 0.0 530 if r < 0: 531 r = 0 532 nr = r / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward) 533 return r, nr 534 535 def _compute_episodic_regret(self): 536 if not self._is_training: 537 # If the agent is not training, the policy will not change we can cache and reuse the regret for each given 538 # starting state. 539 if self._cached_episodic_regrets is None: 540 Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero( 541 self._mdp.H, 542 self._mdp.T, 543 self._mdp.R, 544 self._agent.current_optimal_stochastic_policy, 545 self._mdp.starting_state_distribution, 546 self._mdp.optimal_value_functions[1], 547 ) 548 self._episodic_agent_average_reward = epi_agent_ar 549 self._cached_episodic_regrets = { 550 n: ( 551 Rs[self._mdp.node_to_index[n]] / self._mdp.H, # expected regret 552 Rs[self._mdp.node_to_index[n]] # normalized expected regret 553 / self._mdp.get_minimal_regret_for_starting_node(n), 554 ) 555 for n in self._mdp.starting_nodes 556 } 557 self._regret, self._normalized_regret = self._cached_episodic_regrets[ 558 self._mdp.last_starting_node 559 ] 560 else: 561 Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero( 562 self._mdp.H, 563 self._mdp.T, 564 self._mdp.R, 565 self._agent.current_optimal_stochastic_policy, 566 self._mdp.starting_state_distribution, 567 self._mdp.optimal_value_functions[1], 568 ) 569 self._episodic_agent_average_reward = epi_agent_ar 570 self._regret = ( 571 Rs[self._mdp.node_to_index[self._mdp.last_starting_node]] / self._mdp.H 572 ) 573 self._normalized_regret = ( 574 self._regret 575 / self._mdp.get_minimal_regret_for_starting_node( 576 self._mdp.last_starting_node 577 ) 578 * self._mdp.H 579 ) 580 581 def _is_policy_optimal(self) -> bool: 582 if ( 583 len(self._latest_expected_regrets) 584 == self._n_steps_to_check_for_agent_optimality 585 and np.isclose( 586 0, 587 self._latest_expected_regrets, 588 atol=1e-4 if self._mdp.is_episodic() else 1e-5, 589 ).all() 590 ): 591 # After we get an empirical suggestions that the policy may be optimal, we check if the expected regret is 592 # zero as well 593 self._compute_regrets() 594 return np.isclose(self._normalized_regret, 0).all() 595 return False 596 597 def _set_loop(self, T: int) -> Iterable: 598 """ 599 creates a loop lasting for T steps taking into account the verbosity level. 600 """ 601 if config.VERBOSE_LEVEL != 0: 602 desc = f"Experiment loop {type(self._agent).__name__}@{type(self._mdp).__name__}" 603 if type(config.VERBOSE_LEVEL) == str: 604 self.s = io.StringIO() # we need this reference 605 self._loop = trange(T, desc=desc, file=self.s, mininterval=5) 606 else: 607 self._loop = trange(T, desc=desc, mininterval=5) 608 self._verbose = True 609 else: 610 self._loop = range(T) 611 612 def _update_user_loggings(self, t: int): 613 if self._verbose: # and time() - self._verbose_time > 5: 614 self._verbose_postfix["Instantaneous normalized regret"] = np.round( 615 self._normalized_regret / t, 8 616 ) 617 self._loop.set_postfix(self._verbose_postfix, refresh=False) 618 619 def plot( 620 self, 621 indicator: str = "cumulative_regret", 622 ax=None, 623 baselines=("random", "worst", "optimal"), 624 label=None, 625 ): 626 """ 627 plots the values of the indicator obtained by the agent during the interactions along with the baseline values. 628 629 Parameters 630 ---------- 631 indicator : str 632 The code name of the performance indicator that will be shown in the plot. Check `MDPLoop.get_indicators()` 633 to get a list of the available indicators. By default, the 'cumulative_regret' is shown. 634 ax : plt.Axes 635 The ax object where the plot will be put. By default, a new axis is created. 636 baselines : List[str] 637 The baselines to be included in the plot. Check `MDPLoop.get_baselines()` to get a list of the available 638 baselines. By default, all baselines are shown. 639 label : str 640 The label to be given to the agent. By default, a cleaned version of the agent class name is used. 641 """ 642 643 show = ax is None 644 if ax is None: 645 fig, ax = plt.subplots() 646 647 assert indicator in self.get_indicators(), ( 648 f"{indicator} is not an indicator. The indicators available are: " 649 + ",".join(self.get_indicators()) 650 + "." 651 ) 652 653 df_e = pd.DataFrame(self.logger.data) 654 time_steps = [0] + df_e.loc[:, "steps"].tolist() 655 ax.plot( 656 time_steps[1:] if indicator == "steps_per_second" else time_steps, 657 ([] if indicator == "steps_per_second" else [0]) 658 + df_e.loc[:, indicator].tolist(), 659 label=clear_agent_mdp_class_name(type(self._agent).__name__) 660 if label is None 661 else label, 662 ) 663 ax.set_ylabel(indicator.replace("_", " ").capitalize()) 664 665 for b in baselines: 666 indicator = indicator.replace( 667 "cumulative_reward", "cumulative_expected_reward" 668 ) 669 if b + "_" + indicator in self.get_baseline_indicators(): 670 ax.plot( 671 time_steps, 672 [0] + df_e.loc[:, b + "_" + indicator].tolist(), 673 label=b.capitalize(), 674 # alpha=0.9, 675 linestyle=(0, (5, 10)), 676 color="darkolivegreen" 677 if "optimal" in b 678 else ("darkred" if "worst" in b else "darkslategray"), 679 linewidth=2, 680 ) 681 682 ax.set_xlabel("time step") 683 ax.legend() 684 if show: 685 plt.tight_layout() 686 plt.show()
The MDPLoop
is the object in charge of the agent/MDP interactions and the computation of the performance indicators.
It also provides limited plotting functionalities.
MDPLoop( mdp: Union[colosseum.mdp.base.BaseMDP, colosseum.mdp.base_finite.EpisodicMDP, colosseum.mdp.base_infinite.ContinuousMDP], agent: colosseum.agent.agents.base.BaseAgent, logger: colosseum.utils.acme.base_logger.Logger = None, n_log_intervals_to_check_for_agent_optimality: int = 10, enforce_time_constraint: bool = True)
108 def __init__( 109 self, 110 mdp: Union["BaseMDP", "EpisodicMDP", "ContinuousMDP"], 111 agent: "BaseAgent", 112 logger: Logger = None, 113 n_log_intervals_to_check_for_agent_optimality: int = 10, 114 enforce_time_constraint: bool = True, 115 ) -> object: 116 """ 117 Parameters 118 ---------- 119 mdp: Union["EpisodicMDP", "ContinuousMDP"] 120 The MDP. 121 agent : BaseAgent 122 The agent. 123 logger : Logger 124 The logger where the results of the interaction between the agent and the MDP are stored. By default, the 125 `InMemoryLogger` is used. 126 n_log_intervals_to_check_for_agent_optimality : int 127 The length of the interval between check is the policy has reached optimality. By default, the check happens 128 every ten interactions. 129 enforce_time_constraint : bool 130 If True, the computational time constraint given in the `run` function is enforced through multithreading. 131 By default, it is enforced. 132 """ 133 134 if logger is None: 135 logger = InMemoryLogger() 136 137 self.logger = logger 138 self._enforce_time_constraint = enforce_time_constraint 139 self._mdp = mdp 140 self._agent = agent 141 self._episodic = self._mdp.is_episodic() 142 self._n_steps_to_check_for_agent_optimality = ( 143 n_log_intervals_to_check_for_agent_optimality 144 ) 145 assert self._episodic == agent.is_episodic() 146 assert self._agent.is_emission_map_accepted( 147 Tabular if self._mdp.emission_map is None else self._mdp.emission_map 148 ) 149 self.actions_sequence = []
Parameters
- mdp (Union["EpisodicMDP", "ContinuousMDP"]): The MDP.
- agent (BaseAgent): The agent.
- logger (Logger):
The logger where the results of the interaction between the agent and the MDP are stored. By default, the
InMemoryLogger
is used. - n_log_intervals_to_check_for_agent_optimality (int): The length of the interval between check is the policy has reached optimality. By default, the check happens every ten interactions.
- enforce_time_constraint (bool):
If True, the computational time constraint given in the
run
function is enforced through multithreading. By default, it is enforced.
@staticmethod
def
get_indicators() -> List[str]:
39 @staticmethod 40 def get_indicators() -> List[str]: 41 """ 42 Returns 43 ------- 44 List[str] 45 The code names for the indicators that are computed by the MDPLoop. 46 """ 47 return [ 48 "cumulative_expected_reward", 49 "cumulative_regret", 50 "cumulative_reward", 51 "normalized_cumulative_expected_reward", 52 "normalized_cumulative_regret", 53 "normalized_cumulative_reward", 54 "steps_per_second", 55 ]
Returns
- List[str]: The code names for the indicators that are computed by the MDPLoop.
@staticmethod
def
get_baseline_indicators() -> List[str]:
57 @staticmethod 58 def get_baseline_indicators() -> List[str]: 59 """ 60 Returns 61 ------- 62 List[str] 63 The code names for the baseline indicators that are computed by the MDPLoop. 64 """ 65 return [ 66 "random_cumulative_regret", 67 "random_cumulative_expected_reward", 68 "random_normalized_cumulative_regret", 69 "random_normalized_cumulative_expected_reward", 70 "optimal_cumulative_expected_reward", 71 "optimal_normalized_cumulative_expected_reward", 72 "worst_cumulative_regret", 73 "worst_cumulative_expected_reward", 74 "worst_normalized_cumulative_regret", 75 "worst_normalized_cumulative_expected_reward", 76 ]
Returns
- List[str]: The code names for the baseline indicators that are computed by the MDPLoop.
@staticmethod
def
get_baselines() -> Set[str]:
78 @staticmethod 79 def get_baselines() -> Set[str]: 80 """ 81 Returns 82 ------- 83 Set[str] 84 The baselines available for comparison. 85 """ 86 return set(b[: b.find("_")] for b in MDPLoop.get_baseline_indicators())
Returns
- Set[str]: The baselines available for comparison.
@staticmethod
def
get_baselines_color_dict() -> Dict[str, str]:
88 @staticmethod 89 def get_baselines_color_dict() -> Dict[str, str]: 90 """ 91 Returns 92 ------- 93 Dict[str, str] 94 The color associated by default to the baselines. 95 """ 96 return dict(random="black", worst="crimson", optimal="gold")
Returns
- Dict[str, str]: The color associated by default to the baselines.
@staticmethod
def
get_baselines_style_dict():
98 @staticmethod 99 def get_baselines_style_dict(): 100 """ 101 Returns 102 ------- 103 Dict[str, str] 104 The line style associated by default to the baselines. 105 """ 106 return dict(random=(0, (6, 12)), worst=(9, (6, 12)), optimal=(0, (6, 12)))
Returns
- Dict[str, str]: The line style associated by default to the baselines.
def
run( self, T: int, log_every: int = -1, max_time: float = inf) -> Tuple[int, Dict[str, float]]:
180 def run( 181 self, 182 T: int, 183 log_every: int = -1, 184 max_time: float = np.inf, 185 ) -> Tuple[int, Dict[str, float]]: 186 """ 187 runs the agent/MDP interactions. 188 189 Parameters 190 ---------- 191 T : int 192 The number of total interactions between the agent and the MDP. 193 log_every : int 194 The number of time steps after which performance indicators are calculated. By default, it does not calculate 195 them at any time except at the last one. 196 max_time : float 197 The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted. 198 By default, the maximum given time is infinite. 199 200 Returns 201 ---------- 202 int 203 The time step at which the training has been interrupted due to the time constraint. If the constraint has 204 been respected it returns -1. 205 Dict[str, float] 206 The performance indicators computed at the end of the interactions. 207 """ 208 209 if max_time == np.inf: 210 enforce_time_constraint = False 211 else: 212 enforce_time_constraint = self._enforce_time_constraint 213 214 assert ( 215 type(log_every) == int 216 ), f"The log_every variable should be an integer, received value: {log_every}." 217 log_every = -1 if log_every == 0 else log_every 218 219 # Reset the visitation count of the MDP 220 self._mdp.reset_visitation_counts() 221 222 self._reset_run_variables() 223 self._max_time = max_time 224 225 ts = self._mdp.reset() 226 first_before_new_episode_timer = time() 227 if enforce_time_constraint and self.remaining_time < np.inf: 228 self._limit_update_time(0, self._agent.before_start_interacting) 229 else: 230 self._agent.before_start_interacting() 231 if config._DEBUG_LEVEL > 0: 232 if self._is_training: 233 do = f"before_start_interacting completed in {time() - first_before_new_episode_timer:.2f}." 234 else: 235 do = "before_start_interacting exceeded the time limit." 236 process_debug_output(do) 237 238 self._set_loop(T) 239 for t in self._loop: 240 if self._is_training and self.remaining_time < 0.5: 241 self._limit_exceeded(t) 242 243 # MDP step 244 h = self._mdp.h 245 action = self._agent.select_action(ts, h) 246 new_ts = self._mdp.step(action) 247 self.actions_sequence.append(new_ts.reward) 248 249 # Single step agent update 250 if self._is_training: 251 if enforce_time_constraint and self.remaining_time < np.inf: 252 self._limit_update_time( 253 t, 254 lambda: self._agent.step_update(ts, action, new_ts, h), 255 ) 256 else: 257 self._agent.step_update(ts, action, new_ts, h) 258 259 # End of episode agent update 260 if self._is_training and self._agent.is_episode_end(ts, action, new_ts, h): 261 if enforce_time_constraint and self.remaining_time < np.inf: 262 self._limit_update_time(t, self._agent.episode_end_update) 263 else: 264 self._agent.episode_end_update() 265 266 if t > 0 and log_every > 0 and t % log_every == 0: 267 # Log the performance of the agent 268 self._update_performance_logs(t) 269 self._n_steps_since_last_log = 0 270 271 # User defined custom log 272 self._agent.agent_logs() 273 274 # Verbose loggings 275 self._update_user_loggings(t) 276 277 # Storing the latest regrets 278 self._latest_expected_regrets.append(self._normalized_regret) 279 if ( 280 len(self._latest_expected_regrets) 281 > self._n_steps_to_check_for_agent_optimality 282 ): 283 self._latest_expected_regrets.pop(0) 284 285 # Stop training if the agent has confidently reached the optimal policy 286 if self._is_training and t > 0.2 * T and self._is_policy_optimal(): 287 if type(self._loop) == tqdm.std.tqdm: 288 self._verbose_postfix["is_training"] = f"No, optimal at {t}" 289 self._is_training = False 290 291 self._n_steps_since_last_log += 1 292 self._cumulative_reward += new_ts.reward 293 ts = new_ts 294 295 # Resetting episodic MDPs 296 if self._mdp.is_episodic() and new_ts.last(): 297 assert self._mdp.necessary_reset or t == T - 2 298 ts = self._mdp.reset() 299 self._n_episodes += 1 300 301 self._update_performance_logs(t) 302 self.logger.close() 303 return self._last_training_step, self._last_logs
runs the agent/MDP interactions.
Parameters
- T (int): The number of total interactions between the agent and the MDP.
- log_every (int): The number of time steps after which performance indicators are calculated. By default, it does not calculate them at any time except at the last one.
- max_time (float): The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted. By default, the maximum given time is infinite.
Returns
- int: The time step at which the training has been interrupted due to the time constraint. If the constraint has been respected it returns -1.
- Dict[str, float]: The performance indicators computed at the end of the interactions.
def
plot( self, indicator: str = 'cumulative_regret', ax=None, baselines=('random', 'worst', 'optimal'), label=None):
619 def plot( 620 self, 621 indicator: str = "cumulative_regret", 622 ax=None, 623 baselines=("random", "worst", "optimal"), 624 label=None, 625 ): 626 """ 627 plots the values of the indicator obtained by the agent during the interactions along with the baseline values. 628 629 Parameters 630 ---------- 631 indicator : str 632 The code name of the performance indicator that will be shown in the plot. Check `MDPLoop.get_indicators()` 633 to get a list of the available indicators. By default, the 'cumulative_regret' is shown. 634 ax : plt.Axes 635 The ax object where the plot will be put. By default, a new axis is created. 636 baselines : List[str] 637 The baselines to be included in the plot. Check `MDPLoop.get_baselines()` to get a list of the available 638 baselines. By default, all baselines are shown. 639 label : str 640 The label to be given to the agent. By default, a cleaned version of the agent class name is used. 641 """ 642 643 show = ax is None 644 if ax is None: 645 fig, ax = plt.subplots() 646 647 assert indicator in self.get_indicators(), ( 648 f"{indicator} is not an indicator. The indicators available are: " 649 + ",".join(self.get_indicators()) 650 + "." 651 ) 652 653 df_e = pd.DataFrame(self.logger.data) 654 time_steps = [0] + df_e.loc[:, "steps"].tolist() 655 ax.plot( 656 time_steps[1:] if indicator == "steps_per_second" else time_steps, 657 ([] if indicator == "steps_per_second" else [0]) 658 + df_e.loc[:, indicator].tolist(), 659 label=clear_agent_mdp_class_name(type(self._agent).__name__) 660 if label is None 661 else label, 662 ) 663 ax.set_ylabel(indicator.replace("_", " ").capitalize()) 664 665 for b in baselines: 666 indicator = indicator.replace( 667 "cumulative_reward", "cumulative_expected_reward" 668 ) 669 if b + "_" + indicator in self.get_baseline_indicators(): 670 ax.plot( 671 time_steps, 672 [0] + df_e.loc[:, b + "_" + indicator].tolist(), 673 label=b.capitalize(), 674 # alpha=0.9, 675 linestyle=(0, (5, 10)), 676 color="darkolivegreen" 677 if "optimal" in b 678 else ("darkred" if "worst" in b else "darkslategray"), 679 linewidth=2, 680 ) 681 682 ax.set_xlabel("time step") 683 ax.legend() 684 if show: 685 plt.tight_layout() 686 plt.show()
plots the values of the indicator obtained by the agent during the interactions along with the baseline values.
Parameters
- indicator (str):
The code name of the performance indicator that will be shown in the plot. Check
MDPLoop.get_indicators()
to get a list of the available indicators. By default, the 'cumulative_regret' is shown. - ax (plt.Axes): The ax object where the plot will be put. By default, a new axis is created.
- baselines (List[str]):
The baselines to be included in the plot. Check
MDPLoop.get_baselines()
to get a list of the available baselines. By default, all baselines are shown. - label (str): The label to be given to the agent. By default, a cleaned version of the agent class name is used.