colosseum.experiment.agent_mdp_interaction

  1import io
  2from time import time
  3from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple, Union, Set
  4
  5import numpy as np
  6import pandas as pd
  7import seaborn as sns
  8import toolz
  9import tqdm
 10from matplotlib import pyplot as plt
 11from tqdm import trange
 12from wrapt_timeout_decorator import timeout
 13
 14from colosseum import config
 15from colosseum.config import process_debug_output
 16from colosseum.emission_maps import Tabular
 17from colosseum.experiment.indicators import (
 18    get_episodic_regrets_and_average_reward_at_time_zero,
 19)
 20from colosseum.mdp.utils.markov_chain import get_average_reward
 21from colosseum.utils.acme import InMemoryLogger
 22from colosseum.utils.acme.base_logger import Logger
 23from colosseum.utils.formatter import clear_agent_mdp_class_name
 24
 25if TYPE_CHECKING:
 26    from colosseum.mdp import ContinuousMDP, EpisodicMDP, BaseMDP
 27    from colosseum.agent.agents.base import BaseAgent
 28
 29sns.set_theme()
 30
 31
 32class MDPLoop:
 33    """
 34    The `MDPLoop` is the object in charge of the agent/MDP interactions and the computation of the performance indicators.
 35    It also provides limited plotting functionalities.
 36    """
 37
 38    @staticmethod
 39    def get_indicators() -> List[str]:
 40        """
 41        Returns
 42        -------
 43        List[str]
 44            The code names for the indicators that are computed by the MDPLoop.
 45        """
 46        return [
 47            "cumulative_expected_reward",
 48            "cumulative_regret",
 49            "cumulative_reward",
 50            "normalized_cumulative_expected_reward",
 51            "normalized_cumulative_regret",
 52            "normalized_cumulative_reward",
 53            "steps_per_second",
 54        ]
 55
 56    @staticmethod
 57    def get_baseline_indicators() -> List[str]:
 58        """
 59        Returns
 60        -------
 61        List[str]
 62            The code names for the baseline indicators that are computed by the MDPLoop.
 63        """
 64        return [
 65            "random_cumulative_regret",
 66            "random_cumulative_expected_reward",
 67            "random_normalized_cumulative_regret",
 68            "random_normalized_cumulative_expected_reward",
 69            "optimal_cumulative_expected_reward",
 70            "optimal_normalized_cumulative_expected_reward",
 71            "worst_cumulative_regret",
 72            "worst_cumulative_expected_reward",
 73            "worst_normalized_cumulative_regret",
 74            "worst_normalized_cumulative_expected_reward",
 75        ]
 76
 77    @staticmethod
 78    def get_baselines() -> Set[str]:
 79        """
 80        Returns
 81        -------
 82        Set[str]
 83            The baselines available for comparison.
 84        """
 85        return set(b[: b.find("_")] for b in MDPLoop.get_baseline_indicators())
 86
 87    @staticmethod
 88    def get_baselines_color_dict() -> Dict[str, str]:
 89        """
 90        Returns
 91        -------
 92        Dict[str, str]
 93            The color associated by default to the baselines.
 94        """
 95        return dict(random="black", worst="crimson", optimal="gold")
 96
 97    @staticmethod
 98    def get_baselines_style_dict():
 99        """
100        Returns
101        -------
102        Dict[str, str]
103            The line style associated by default to the baselines.
104        """
105        return dict(random=(0, (6, 12)), worst=(9, (6, 12)), optimal=(0, (6, 12)))
106
107    def __init__(
108        self,
109        mdp: Union["BaseMDP", "EpisodicMDP", "ContinuousMDP"],
110        agent: "BaseAgent",
111        logger: Logger = None,
112        n_log_intervals_to_check_for_agent_optimality: int = 10,
113        enforce_time_constraint: bool = True,
114    ) -> object:
115        """
116        Parameters
117        ----------
118        mdp: Union["EpisodicMDP", "ContinuousMDP"]
119            The MDP.
120        agent : BaseAgent
121            The agent.
122        logger : Logger
123            The logger where the results of the interaction between the agent and the MDP are stored. By default, the
124            `InMemoryLogger` is used.
125        n_log_intervals_to_check_for_agent_optimality : int
126            The length of the interval between check is the policy has reached optimality. By default, the check happens
127            every ten interactions.
128        enforce_time_constraint : bool
129            If True, the computational time constraint given in the `run` function is enforced through multithreading.
130            By default, it is enforced.
131        """
132
133        if logger is None:
134            logger = InMemoryLogger()
135
136        self.logger = logger
137        self._enforce_time_constraint = enforce_time_constraint
138        self._mdp = mdp
139        self._agent = agent
140        self._episodic = self._mdp.is_episodic()
141        self._n_steps_to_check_for_agent_optimality = (
142            n_log_intervals_to_check_for_agent_optimality
143        )
144        assert self._episodic == agent.is_episodic()
145        assert self._agent.is_emission_map_accepted(
146            Tabular if self._mdp.emission_map is None else self._mdp.emission_map
147        )
148        self.actions_sequence = []
149
150    @property
151    def remaining_time(self) -> float:
152        """
153        Returns
154        -------
155        float
156            The remaining computational time for training the agent.
157        """
158        return self._max_time - (time() - self._mdp_loop_timer)
159
160    def _limit_update_time(self, t, f):
161        try:
162            if self.remaining_time < 0.5:
163                raise TimeoutError()
164            timeout(self.remaining_time)(f)()
165        except TimeoutError or SystemError:
166            if config._DEBUG_LEVEL > 0:
167                print("Time exceeded with function ", f)
168            self._limit_exceeded(t)
169
170    def _limit_exceeded(self, t):
171        self._is_training = False
172        self._last_training_step = t
173        if config._DEBUG_LEVEL > 0:
174            do = f"Stopped training at {time() - self._mdp_loop_timer:.2f}"
175            process_debug_output(do)
176        if self._verbose:
177            self._verbose_postfix["is_training"] = f"No, time exhausted at {t}"
178
179    def run(
180        self,
181        T: int,
182        log_every: int = -1,
183        max_time: float = np.inf,
184    ) -> Tuple[int, Dict[str, float]]:
185        """
186        runs the agent/MDP interactions.
187
188        Parameters
189        ----------
190        T : int
191            The number of total interactions between the agent and the MDP.
192        log_every : int
193            The number of time steps after which performance indicators are calculated. By default, it does not calculate
194            them at any time except at the last one.
195        max_time : float
196            The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted.
197            By default, the maximum given time is infinite.
198
199        Returns
200        ----------
201        int
202            The time step at which the training has been interrupted due to the time constraint. If the constraint has
203            been respected it returns -1.
204        Dict[str, float]
205            The performance indicators computed at the end of the interactions.
206        """
207
208        if max_time == np.inf:
209            enforce_time_constraint = False
210        else:
211            enforce_time_constraint = self._enforce_time_constraint
212
213        assert (
214            type(log_every) == int
215        ), f"The log_every variable should be an integer, received value: {log_every}."
216        log_every = -1 if log_every == 0 else log_every
217
218        # Reset the visitation count of the MDP
219        self._mdp.reset_visitation_counts()
220
221        self._reset_run_variables()
222        self._max_time = max_time
223
224        ts = self._mdp.reset()
225        first_before_new_episode_timer = time()
226        if enforce_time_constraint and self.remaining_time < np.inf:
227            self._limit_update_time(0, self._agent.before_start_interacting)
228        else:
229            self._agent.before_start_interacting()
230        if config._DEBUG_LEVEL > 0:
231            if self._is_training:
232                do = f"before_start_interacting completed in {time() - first_before_new_episode_timer:.2f}."
233            else:
234                do = "before_start_interacting exceeded the time limit."
235            process_debug_output(do)
236
237        self._set_loop(T)
238        for t in self._loop:
239            if self._is_training and self.remaining_time < 0.5:
240                self._limit_exceeded(t)
241
242            # MDP step
243            h = self._mdp.h
244            action = self._agent.select_action(ts, h)
245            new_ts = self._mdp.step(action)
246            self.actions_sequence.append(new_ts.reward)
247
248            # Single step agent update
249            if self._is_training:
250                if enforce_time_constraint and self.remaining_time < np.inf:
251                    self._limit_update_time(
252                        t,
253                        lambda: self._agent.step_update(ts, action, new_ts, h),
254                    )
255                else:
256                    self._agent.step_update(ts, action, new_ts, h)
257
258            # End of episode agent update
259            if self._is_training and self._agent.is_episode_end(ts, action, new_ts, h):
260                if enforce_time_constraint and self.remaining_time < np.inf:
261                    self._limit_update_time(t, self._agent.episode_end_update)
262                else:
263                    self._agent.episode_end_update()
264
265            if t > 0 and log_every > 0 and t % log_every == 0:
266                # Log the performance of the agent
267                self._update_performance_logs(t)
268                self._n_steps_since_last_log = 0
269
270                # User defined custom log
271                self._agent.agent_logs()
272
273                # Verbose loggings
274                self._update_user_loggings(t)
275
276                # Storing the latest regrets
277                self._latest_expected_regrets.append(self._normalized_regret)
278                if (
279                    len(self._latest_expected_regrets)
280                    > self._n_steps_to_check_for_agent_optimality
281                ):
282                    self._latest_expected_regrets.pop(0)
283
284                # Stop training if the agent has confidently reached the optimal policy
285                if self._is_training and t > 0.2 * T and self._is_policy_optimal():
286                    if type(self._loop) == tqdm.std.tqdm:
287                        self._verbose_postfix["is_training"] = f"No, optimal at {t}"
288                    self._is_training = False
289
290            self._n_steps_since_last_log += 1
291            self._cumulative_reward += new_ts.reward
292            ts = new_ts
293
294            # Resetting episodic MDPs
295            if self._mdp.is_episodic() and new_ts.last():
296                assert self._mdp.necessary_reset or t == T - 2
297                ts = self._mdp.reset()
298                self._n_episodes += 1
299
300        self._update_performance_logs(t)
301        self.logger.close()
302        return self._last_training_step, self._last_logs
303
304    def _reset_run_variables(self):
305        self._cumulative_reward = 0.0
306        self._cumulative_regret = 0.0
307        self._normalized_cumulative_regret = 0.0
308        self._random_cumulative_expected_reward = 0.0
309        self._random_cumulative_regret = 0.0
310        self._normalized_random_cumulative_regret = 0.0
311        self._cumulative_expected_reward_agent = 0.0
312
313        self._verbose = False
314        self._verbose_postfix = dict(is_training="True")
315        self._is_training = True
316        self._n_steps_since_last_log = 0
317        self._last_training_step = -1
318        self._n_episodes = 0
319        self._last_logs = None
320        self._past_logs = None
321        self._cached_episodic_regrets = None
322        self._cached_continuous_regrets = None
323        self._latest_expected_regrets = []
324
325        # Cache the regret for the random agent
326        if self._episodic:
327
328            # Random agent regret
329            self._episodic_regret_random_agent = (
330                self._mdp.episodic_optimal_average_reward
331                - self._mdp.episodic_random_average_reward
332            )
333            self._episodic_normalized_regret_random_agent = (
334                self._episodic_regret_random_agent
335                / (
336                    self._mdp.episodic_optimal_average_reward
337                    - self._mdp.episodic_worst_average_reward
338                )
339            )
340
341            # Worst agent regret
342            self._episodic_regret_worst_agent = (
343                self._mdp.episodic_optimal_average_reward
344                - self._mdp.episodic_worst_average_reward
345            )
346            self._episodic_normalized_regret_worst_agent = (
347                self._episodic_regret_worst_agent
348                / (
349                    self._mdp.episodic_optimal_average_reward
350                    - self._mdp.episodic_worst_average_reward
351                )
352            )
353
354            # Reward normalized
355            self._cumulative_reward_normalizer = lambda t, cr: (
356                cr - t * self._mdp.episodic_worst_average_reward
357            ) / (
358                self._mdp.episodic_optimal_average_reward
359                - self._mdp.episodic_worst_average_reward
360            )
361        else:
362
363            # Random agent regret
364            self._regret_random_agent = (
365                self._mdp.optimal_average_reward - self._mdp.random_average_reward
366            )
367            self._normalized_regret_random_agent = self._regret_random_agent / (
368                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
369            )
370
371            # Worst agent regret
372            self._regret_worst_agent = (
373                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
374            )
375            self._normalized_regret_worst_agent = self._regret_worst_agent / (
376                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
377            )
378
379            assert (
380                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
381                > 0.0002
382            ), type(self._mdp).__name__ + str(self._mdp.parameters)
383
384            self._cumulative_reward_normalizer = lambda t, cr: (
385                cr - t * self._mdp.worst_average_reward
386            ) / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward)
387
388        self.logger.reset()
389        self._mdp_loop_timer = time()
390        self._verbose_time = time()
391
392    def _update_performance_logs(self, t: int):
393        self._compute_performance_indicators(t + 1)
394
395        self._last_logs = dict(
396            steps=t,
397            cumulative_regret=self._cumulative_regret,
398            cumulative_reward=self._cumulative_reward,
399            cumulative_expected_reward=self._cumulative_expected_reward_agent,
400            normalized_cumulative_regret=self._normalized_cumulative_regret,
401            normalized_cumulative_reward=self._cumulative_reward_normalizer(
402                t, self._cumulative_reward
403            ),
404            normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
405                t, self._cumulative_expected_reward_agent
406            ),
407            random_cumulative_regret=self._cumulative_regret_random_agent,
408            random_cumulative_expected_reward=self._cumulative_reward_random_agent,
409            random_normalized_cumulative_regret=self._normalized_cumulative_regret_random_agent,
410            random_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
411                t, self._cumulative_reward_random_agent
412            ),
413            worst_cumulative_regret=self._cumulative_regret_worst_agent,
414            worst_cumulative_expected_reward=self._cumulative_reward_worst_agent,
415            worst_normalized_cumulative_regret=self._normalized_cumulative_regret_worst_agent,
416            worst_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
417                t, self._cumulative_reward_worst_agent
418            ),
419            optimal_cumulative_expected_reward=self._cumulative_reward_optimal_agent,
420            optimal_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
421                t, self._cumulative_reward_optimal_agent
422            ),
423            steps_per_second=t / (time() - self._mdp_loop_timer),
424        )
425
426        # Communicate the indicators to the logger with a maximum of five digits
427        a = toolz.valmap(lambda x: np.round(x, 5), self._last_logs)
428        self.logger.write(a)
429
430    def _compute_regrets(self):
431        if self._episodic:
432            return self._compute_episodic_regret()
433        return self._compute_continuous_regret()
434
435    def _compute_performance_indicators(self, t: int):
436        self._compute_regrets()
437
438        if self._episodic:
439            # Randon agent (regret)
440            self._cumulative_regret_random_agent = (
441                self._episodic_regret_random_agent * t
442            )
443            self._normalized_cumulative_regret_random_agent = (
444                self._episodic_normalized_regret_random_agent * t
445            )
446
447            # Worst agent (regret)
448            self._cumulative_regret_worst_agent = self._episodic_regret_worst_agent * t
449            self._normalized_cumulative_regret_worst_agent = (
450                self._episodic_normalized_regret_worst_agent * t
451            )
452
453            # Random agent (reward)
454            self._cumulative_reward_random_agent = (
455                self._mdp.episodic_random_average_reward * t
456            )
457
458            # Worst agent (reward)
459            self._cumulative_reward_worst_agent = (
460                self._mdp.episodic_worst_average_reward * t
461            )
462
463            # Optimal agent (reward)
464            self._cumulative_reward_optimal_agent = (
465                self._mdp.episodic_optimal_average_reward * t
466            )
467
468        else:
469            # Randon agent (regret)
470            self._cumulative_regret_random_agent = self._regret_random_agent * t
471            self._normalized_cumulative_regret_random_agent = (
472                self._normalized_regret_random_agent * t
473            )
474
475            # Worst agent (regret)
476            self._cumulative_regret_worst_agent = self._regret_worst_agent * t
477            self._normalized_cumulative_regret_worst_agent = (
478                self._normalized_regret_worst_agent * t
479            )
480
481            # Random agent (reward)
482            self._cumulative_reward_random_agent = self._mdp.random_average_reward * t
483
484            # Worst agent (reward)
485            self._cumulative_reward_worst_agent = self._mdp.worst_average_reward * t
486
487            # Optimal agent (reward)
488            self._cumulative_reward_optimal_agent = self._mdp.optimal_average_reward * t
489
490        # Avoid numerical errors that lead to negative rewards
491        assert (
492            self._regret >= 0.0
493        ), f"{self._regret} on {type(self._mdp).__name__} {self._mdp.parameters} for policy {self._agent.current_optimal_stochastic_policy}"
494        assert self._normalized_regret >= 0.0, self._normalized_regret
495
496        self._cumulative_regret += self._regret * self._n_steps_since_last_log
497        self._normalized_cumulative_regret += (
498            self._normalized_regret * self._n_steps_since_last_log
499        )
500        self._cumulative_expected_reward_agent += (
501            self._agent_average_reward * self._n_steps_since_last_log
502        )
503
504    @property
505    def _agent_average_reward(self):
506        if self._episodic:
507            return self._episodic_agent_average_reward / self._mdp.H
508        return self._agent_continuous_average_reward
509
510    def _compute_continuous_regret(self):
511        if not self._is_training:
512            if self._cached_continuous_regrets is None:
513                self._cached_continuous_regrets = self._get_continuous_regrets()
514            self._regret, self._normalized_regret = self._cached_continuous_regrets
515        else:
516            self._regret, self._normalized_regret = self._get_continuous_regrets()
517
518    def _get_continuous_regrets(self):
519        self._agent_continuous_average_reward = get_average_reward(
520            self._mdp.T,
521            self._mdp.R,
522            self._agent.current_optimal_stochastic_policy,
523            [(self._mdp.node_to_index[self._mdp.cur_node], 1.0)],
524        )
525
526        r = self._mdp.optimal_average_reward - self._agent_continuous_average_reward
527        if np.isclose(r, 0.0, atol=1e-3):
528            r = 0.0
529        if r < 0:
530            r = 0
531        nr = r / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward)
532        return r, nr
533
534    def _compute_episodic_regret(self):
535        if not self._is_training:
536            # If the agent is not training, the policy will not change we can cache and reuse the regret for each given
537            # starting state.
538            if self._cached_episodic_regrets is None:
539                Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero(
540                    self._mdp.H,
541                    self._mdp.T,
542                    self._mdp.R,
543                    self._agent.current_optimal_stochastic_policy,
544                    self._mdp.starting_state_distribution,
545                    self._mdp.optimal_value_functions[1],
546                )
547                self._episodic_agent_average_reward = epi_agent_ar
548                self._cached_episodic_regrets = {
549                    n: (
550                        Rs[self._mdp.node_to_index[n]] / self._mdp.H,  # expected regret
551                        Rs[self._mdp.node_to_index[n]]  # normalized expected regret
552                        / self._mdp.get_minimal_regret_for_starting_node(n),
553                    )
554                    for n in self._mdp.starting_nodes
555                }
556            self._regret, self._normalized_regret = self._cached_episodic_regrets[
557                self._mdp.last_starting_node
558            ]
559        else:
560            Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero(
561                self._mdp.H,
562                self._mdp.T,
563                self._mdp.R,
564                self._agent.current_optimal_stochastic_policy,
565                self._mdp.starting_state_distribution,
566                self._mdp.optimal_value_functions[1],
567            )
568            self._episodic_agent_average_reward = epi_agent_ar
569            self._regret = (
570                Rs[self._mdp.node_to_index[self._mdp.last_starting_node]] / self._mdp.H
571            )
572            self._normalized_regret = (
573                self._regret
574                / self._mdp.get_minimal_regret_for_starting_node(
575                    self._mdp.last_starting_node
576                )
577                * self._mdp.H
578            )
579
580    def _is_policy_optimal(self) -> bool:
581        if (
582            len(self._latest_expected_regrets)
583            == self._n_steps_to_check_for_agent_optimality
584            and np.isclose(
585                0,
586                self._latest_expected_regrets,
587                atol=1e-4 if self._mdp.is_episodic() else 1e-5,
588            ).all()
589        ):
590            # After we get an empirical suggestions that the policy may be optimal, we check if the expected regret is
591            # zero as well
592            self._compute_regrets()
593            return np.isclose(self._normalized_regret, 0).all()
594        return False
595
596    def _set_loop(self, T: int) -> Iterable:
597        """
598        creates a loop lasting for T steps taking into account the verbosity level.
599        """
600        if config.VERBOSE_LEVEL != 0:
601            desc = f"Experiment loop {type(self._agent).__name__}@{type(self._mdp).__name__}"
602            if type(config.VERBOSE_LEVEL) == str:
603                self.s = io.StringIO()  # we need this reference
604                self._loop = trange(T, desc=desc, file=self.s, mininterval=5)
605            else:
606                self._loop = trange(T, desc=desc, mininterval=5)
607            self._verbose = True
608        else:
609            self._loop = range(T)
610
611    def _update_user_loggings(self, t: int):
612        if self._verbose:  # and time() - self._verbose_time > 5:
613            self._verbose_postfix["Instantaneous normalized regret"] = np.round(
614                self._normalized_regret / t, 8
615            )
616            self._loop.set_postfix(self._verbose_postfix, refresh=False)
617
618    def plot(
619        self,
620        indicator: str = "cumulative_regret",
621        ax=None,
622        baselines=("random", "worst", "optimal"),
623        label=None,
624    ):
625        """
626        plots the values of the indicator obtained by the agent during the interactions along with the baseline values.
627
628        Parameters
629        ----------
630        indicator : str
631            The code name of the performance indicator that will be shown in the plot. Check `MDPLoop.get_indicators()`
632            to get a list of the available indicators. By default, the 'cumulative_regret' is shown.
633        ax : plt.Axes
634            The ax object where the plot will be put. By default, a new axis is created.
635        baselines : List[str]
636            The baselines to be included in the plot. Check `MDPLoop.get_baselines()` to get a list of the available
637            baselines. By default, all baselines are shown.
638        label : str
639            The label to be given to the agent. By default, a cleaned version of the agent class name is used.
640        """
641
642        show = ax is None
643        if ax is None:
644            fig, ax = plt.subplots()
645
646        assert indicator in self.get_indicators(), (
647            f"{indicator} is not an indicator. The indicators available are: "
648            + ",".join(self.get_indicators())
649            + "."
650        )
651
652        df_e = pd.DataFrame(self.logger.data)
653        time_steps = [0] + df_e.loc[:, "steps"].tolist()
654        ax.plot(
655            time_steps[1:] if indicator == "steps_per_second" else time_steps,
656            ([] if indicator == "steps_per_second" else [0])
657            + df_e.loc[:, indicator].tolist(),
658            label=clear_agent_mdp_class_name(type(self._agent).__name__)
659            if label is None
660            else label,
661        )
662        ax.set_ylabel(indicator.replace("_", " ").capitalize())
663
664        for b in baselines:
665            indicator = indicator.replace(
666                "cumulative_reward", "cumulative_expected_reward"
667            )
668            if b + "_" + indicator in self.get_baseline_indicators():
669                ax.plot(
670                    time_steps,
671                    [0] + df_e.loc[:, b + "_" + indicator].tolist(),
672                    label=b.capitalize(),
673                    # alpha=0.9,
674                    linestyle=(0, (5, 10)),
675                    color="darkolivegreen"
676                    if "optimal" in b
677                    else ("darkred" if "worst" in b else "darkslategray"),
678                    linewidth=2,
679                )
680
681        ax.set_xlabel("time step")
682        ax.legend()
683        if show:
684            plt.tight_layout()
685            plt.show()
class MDPLoop:
 33class MDPLoop:
 34    """
 35    The `MDPLoop` is the object in charge of the agent/MDP interactions and the computation of the performance indicators.
 36    It also provides limited plotting functionalities.
 37    """
 38
 39    @staticmethod
 40    def get_indicators() -> List[str]:
 41        """
 42        Returns
 43        -------
 44        List[str]
 45            The code names for the indicators that are computed by the MDPLoop.
 46        """
 47        return [
 48            "cumulative_expected_reward",
 49            "cumulative_regret",
 50            "cumulative_reward",
 51            "normalized_cumulative_expected_reward",
 52            "normalized_cumulative_regret",
 53            "normalized_cumulative_reward",
 54            "steps_per_second",
 55        ]
 56
 57    @staticmethod
 58    def get_baseline_indicators() -> List[str]:
 59        """
 60        Returns
 61        -------
 62        List[str]
 63            The code names for the baseline indicators that are computed by the MDPLoop.
 64        """
 65        return [
 66            "random_cumulative_regret",
 67            "random_cumulative_expected_reward",
 68            "random_normalized_cumulative_regret",
 69            "random_normalized_cumulative_expected_reward",
 70            "optimal_cumulative_expected_reward",
 71            "optimal_normalized_cumulative_expected_reward",
 72            "worst_cumulative_regret",
 73            "worst_cumulative_expected_reward",
 74            "worst_normalized_cumulative_regret",
 75            "worst_normalized_cumulative_expected_reward",
 76        ]
 77
 78    @staticmethod
 79    def get_baselines() -> Set[str]:
 80        """
 81        Returns
 82        -------
 83        Set[str]
 84            The baselines available for comparison.
 85        """
 86        return set(b[: b.find("_")] for b in MDPLoop.get_baseline_indicators())
 87
 88    @staticmethod
 89    def get_baselines_color_dict() -> Dict[str, str]:
 90        """
 91        Returns
 92        -------
 93        Dict[str, str]
 94            The color associated by default to the baselines.
 95        """
 96        return dict(random="black", worst="crimson", optimal="gold")
 97
 98    @staticmethod
 99    def get_baselines_style_dict():
100        """
101        Returns
102        -------
103        Dict[str, str]
104            The line style associated by default to the baselines.
105        """
106        return dict(random=(0, (6, 12)), worst=(9, (6, 12)), optimal=(0, (6, 12)))
107
108    def __init__(
109        self,
110        mdp: Union["BaseMDP", "EpisodicMDP", "ContinuousMDP"],
111        agent: "BaseAgent",
112        logger: Logger = None,
113        n_log_intervals_to_check_for_agent_optimality: int = 10,
114        enforce_time_constraint: bool = True,
115    ) -> object:
116        """
117        Parameters
118        ----------
119        mdp: Union["EpisodicMDP", "ContinuousMDP"]
120            The MDP.
121        agent : BaseAgent
122            The agent.
123        logger : Logger
124            The logger where the results of the interaction between the agent and the MDP are stored. By default, the
125            `InMemoryLogger` is used.
126        n_log_intervals_to_check_for_agent_optimality : int
127            The length of the interval between check is the policy has reached optimality. By default, the check happens
128            every ten interactions.
129        enforce_time_constraint : bool
130            If True, the computational time constraint given in the `run` function is enforced through multithreading.
131            By default, it is enforced.
132        """
133
134        if logger is None:
135            logger = InMemoryLogger()
136
137        self.logger = logger
138        self._enforce_time_constraint = enforce_time_constraint
139        self._mdp = mdp
140        self._agent = agent
141        self._episodic = self._mdp.is_episodic()
142        self._n_steps_to_check_for_agent_optimality = (
143            n_log_intervals_to_check_for_agent_optimality
144        )
145        assert self._episodic == agent.is_episodic()
146        assert self._agent.is_emission_map_accepted(
147            Tabular if self._mdp.emission_map is None else self._mdp.emission_map
148        )
149        self.actions_sequence = []
150
151    @property
152    def remaining_time(self) -> float:
153        """
154        Returns
155        -------
156        float
157            The remaining computational time for training the agent.
158        """
159        return self._max_time - (time() - self._mdp_loop_timer)
160
161    def _limit_update_time(self, t, f):
162        try:
163            if self.remaining_time < 0.5:
164                raise TimeoutError()
165            timeout(self.remaining_time)(f)()
166        except TimeoutError or SystemError:
167            if config._DEBUG_LEVEL > 0:
168                print("Time exceeded with function ", f)
169            self._limit_exceeded(t)
170
171    def _limit_exceeded(self, t):
172        self._is_training = False
173        self._last_training_step = t
174        if config._DEBUG_LEVEL > 0:
175            do = f"Stopped training at {time() - self._mdp_loop_timer:.2f}"
176            process_debug_output(do)
177        if self._verbose:
178            self._verbose_postfix["is_training"] = f"No, time exhausted at {t}"
179
180    def run(
181        self,
182        T: int,
183        log_every: int = -1,
184        max_time: float = np.inf,
185    ) -> Tuple[int, Dict[str, float]]:
186        """
187        runs the agent/MDP interactions.
188
189        Parameters
190        ----------
191        T : int
192            The number of total interactions between the agent and the MDP.
193        log_every : int
194            The number of time steps after which performance indicators are calculated. By default, it does not calculate
195            them at any time except at the last one.
196        max_time : float
197            The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted.
198            By default, the maximum given time is infinite.
199
200        Returns
201        ----------
202        int
203            The time step at which the training has been interrupted due to the time constraint. If the constraint has
204            been respected it returns -1.
205        Dict[str, float]
206            The performance indicators computed at the end of the interactions.
207        """
208
209        if max_time == np.inf:
210            enforce_time_constraint = False
211        else:
212            enforce_time_constraint = self._enforce_time_constraint
213
214        assert (
215            type(log_every) == int
216        ), f"The log_every variable should be an integer, received value: {log_every}."
217        log_every = -1 if log_every == 0 else log_every
218
219        # Reset the visitation count of the MDP
220        self._mdp.reset_visitation_counts()
221
222        self._reset_run_variables()
223        self._max_time = max_time
224
225        ts = self._mdp.reset()
226        first_before_new_episode_timer = time()
227        if enforce_time_constraint and self.remaining_time < np.inf:
228            self._limit_update_time(0, self._agent.before_start_interacting)
229        else:
230            self._agent.before_start_interacting()
231        if config._DEBUG_LEVEL > 0:
232            if self._is_training:
233                do = f"before_start_interacting completed in {time() - first_before_new_episode_timer:.2f}."
234            else:
235                do = "before_start_interacting exceeded the time limit."
236            process_debug_output(do)
237
238        self._set_loop(T)
239        for t in self._loop:
240            if self._is_training and self.remaining_time < 0.5:
241                self._limit_exceeded(t)
242
243            # MDP step
244            h = self._mdp.h
245            action = self._agent.select_action(ts, h)
246            new_ts = self._mdp.step(action)
247            self.actions_sequence.append(new_ts.reward)
248
249            # Single step agent update
250            if self._is_training:
251                if enforce_time_constraint and self.remaining_time < np.inf:
252                    self._limit_update_time(
253                        t,
254                        lambda: self._agent.step_update(ts, action, new_ts, h),
255                    )
256                else:
257                    self._agent.step_update(ts, action, new_ts, h)
258
259            # End of episode agent update
260            if self._is_training and self._agent.is_episode_end(ts, action, new_ts, h):
261                if enforce_time_constraint and self.remaining_time < np.inf:
262                    self._limit_update_time(t, self._agent.episode_end_update)
263                else:
264                    self._agent.episode_end_update()
265
266            if t > 0 and log_every > 0 and t % log_every == 0:
267                # Log the performance of the agent
268                self._update_performance_logs(t)
269                self._n_steps_since_last_log = 0
270
271                # User defined custom log
272                self._agent.agent_logs()
273
274                # Verbose loggings
275                self._update_user_loggings(t)
276
277                # Storing the latest regrets
278                self._latest_expected_regrets.append(self._normalized_regret)
279                if (
280                    len(self._latest_expected_regrets)
281                    > self._n_steps_to_check_for_agent_optimality
282                ):
283                    self._latest_expected_regrets.pop(0)
284
285                # Stop training if the agent has confidently reached the optimal policy
286                if self._is_training and t > 0.2 * T and self._is_policy_optimal():
287                    if type(self._loop) == tqdm.std.tqdm:
288                        self._verbose_postfix["is_training"] = f"No, optimal at {t}"
289                    self._is_training = False
290
291            self._n_steps_since_last_log += 1
292            self._cumulative_reward += new_ts.reward
293            ts = new_ts
294
295            # Resetting episodic MDPs
296            if self._mdp.is_episodic() and new_ts.last():
297                assert self._mdp.necessary_reset or t == T - 2
298                ts = self._mdp.reset()
299                self._n_episodes += 1
300
301        self._update_performance_logs(t)
302        self.logger.close()
303        return self._last_training_step, self._last_logs
304
305    def _reset_run_variables(self):
306        self._cumulative_reward = 0.0
307        self._cumulative_regret = 0.0
308        self._normalized_cumulative_regret = 0.0
309        self._random_cumulative_expected_reward = 0.0
310        self._random_cumulative_regret = 0.0
311        self._normalized_random_cumulative_regret = 0.0
312        self._cumulative_expected_reward_agent = 0.0
313
314        self._verbose = False
315        self._verbose_postfix = dict(is_training="True")
316        self._is_training = True
317        self._n_steps_since_last_log = 0
318        self._last_training_step = -1
319        self._n_episodes = 0
320        self._last_logs = None
321        self._past_logs = None
322        self._cached_episodic_regrets = None
323        self._cached_continuous_regrets = None
324        self._latest_expected_regrets = []
325
326        # Cache the regret for the random agent
327        if self._episodic:
328
329            # Random agent regret
330            self._episodic_regret_random_agent = (
331                self._mdp.episodic_optimal_average_reward
332                - self._mdp.episodic_random_average_reward
333            )
334            self._episodic_normalized_regret_random_agent = (
335                self._episodic_regret_random_agent
336                / (
337                    self._mdp.episodic_optimal_average_reward
338                    - self._mdp.episodic_worst_average_reward
339                )
340            )
341
342            # Worst agent regret
343            self._episodic_regret_worst_agent = (
344                self._mdp.episodic_optimal_average_reward
345                - self._mdp.episodic_worst_average_reward
346            )
347            self._episodic_normalized_regret_worst_agent = (
348                self._episodic_regret_worst_agent
349                / (
350                    self._mdp.episodic_optimal_average_reward
351                    - self._mdp.episodic_worst_average_reward
352                )
353            )
354
355            # Reward normalized
356            self._cumulative_reward_normalizer = lambda t, cr: (
357                cr - t * self._mdp.episodic_worst_average_reward
358            ) / (
359                self._mdp.episodic_optimal_average_reward
360                - self._mdp.episodic_worst_average_reward
361            )
362        else:
363
364            # Random agent regret
365            self._regret_random_agent = (
366                self._mdp.optimal_average_reward - self._mdp.random_average_reward
367            )
368            self._normalized_regret_random_agent = self._regret_random_agent / (
369                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
370            )
371
372            # Worst agent regret
373            self._regret_worst_agent = (
374                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
375            )
376            self._normalized_regret_worst_agent = self._regret_worst_agent / (
377                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
378            )
379
380            assert (
381                self._mdp.optimal_average_reward - self._mdp.worst_average_reward
382                > 0.0002
383            ), type(self._mdp).__name__ + str(self._mdp.parameters)
384
385            self._cumulative_reward_normalizer = lambda t, cr: (
386                cr - t * self._mdp.worst_average_reward
387            ) / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward)
388
389        self.logger.reset()
390        self._mdp_loop_timer = time()
391        self._verbose_time = time()
392
393    def _update_performance_logs(self, t: int):
394        self._compute_performance_indicators(t + 1)
395
396        self._last_logs = dict(
397            steps=t,
398            cumulative_regret=self._cumulative_regret,
399            cumulative_reward=self._cumulative_reward,
400            cumulative_expected_reward=self._cumulative_expected_reward_agent,
401            normalized_cumulative_regret=self._normalized_cumulative_regret,
402            normalized_cumulative_reward=self._cumulative_reward_normalizer(
403                t, self._cumulative_reward
404            ),
405            normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
406                t, self._cumulative_expected_reward_agent
407            ),
408            random_cumulative_regret=self._cumulative_regret_random_agent,
409            random_cumulative_expected_reward=self._cumulative_reward_random_agent,
410            random_normalized_cumulative_regret=self._normalized_cumulative_regret_random_agent,
411            random_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
412                t, self._cumulative_reward_random_agent
413            ),
414            worst_cumulative_regret=self._cumulative_regret_worst_agent,
415            worst_cumulative_expected_reward=self._cumulative_reward_worst_agent,
416            worst_normalized_cumulative_regret=self._normalized_cumulative_regret_worst_agent,
417            worst_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
418                t, self._cumulative_reward_worst_agent
419            ),
420            optimal_cumulative_expected_reward=self._cumulative_reward_optimal_agent,
421            optimal_normalized_cumulative_expected_reward=self._cumulative_reward_normalizer(
422                t, self._cumulative_reward_optimal_agent
423            ),
424            steps_per_second=t / (time() - self._mdp_loop_timer),
425        )
426
427        # Communicate the indicators to the logger with a maximum of five digits
428        a = toolz.valmap(lambda x: np.round(x, 5), self._last_logs)
429        self.logger.write(a)
430
431    def _compute_regrets(self):
432        if self._episodic:
433            return self._compute_episodic_regret()
434        return self._compute_continuous_regret()
435
436    def _compute_performance_indicators(self, t: int):
437        self._compute_regrets()
438
439        if self._episodic:
440            # Randon agent (regret)
441            self._cumulative_regret_random_agent = (
442                self._episodic_regret_random_agent * t
443            )
444            self._normalized_cumulative_regret_random_agent = (
445                self._episodic_normalized_regret_random_agent * t
446            )
447
448            # Worst agent (regret)
449            self._cumulative_regret_worst_agent = self._episodic_regret_worst_agent * t
450            self._normalized_cumulative_regret_worst_agent = (
451                self._episodic_normalized_regret_worst_agent * t
452            )
453
454            # Random agent (reward)
455            self._cumulative_reward_random_agent = (
456                self._mdp.episodic_random_average_reward * t
457            )
458
459            # Worst agent (reward)
460            self._cumulative_reward_worst_agent = (
461                self._mdp.episodic_worst_average_reward * t
462            )
463
464            # Optimal agent (reward)
465            self._cumulative_reward_optimal_agent = (
466                self._mdp.episodic_optimal_average_reward * t
467            )
468
469        else:
470            # Randon agent (regret)
471            self._cumulative_regret_random_agent = self._regret_random_agent * t
472            self._normalized_cumulative_regret_random_agent = (
473                self._normalized_regret_random_agent * t
474            )
475
476            # Worst agent (regret)
477            self._cumulative_regret_worst_agent = self._regret_worst_agent * t
478            self._normalized_cumulative_regret_worst_agent = (
479                self._normalized_regret_worst_agent * t
480            )
481
482            # Random agent (reward)
483            self._cumulative_reward_random_agent = self._mdp.random_average_reward * t
484
485            # Worst agent (reward)
486            self._cumulative_reward_worst_agent = self._mdp.worst_average_reward * t
487
488            # Optimal agent (reward)
489            self._cumulative_reward_optimal_agent = self._mdp.optimal_average_reward * t
490
491        # Avoid numerical errors that lead to negative rewards
492        assert (
493            self._regret >= 0.0
494        ), f"{self._regret} on {type(self._mdp).__name__} {self._mdp.parameters} for policy {self._agent.current_optimal_stochastic_policy}"
495        assert self._normalized_regret >= 0.0, self._normalized_regret
496
497        self._cumulative_regret += self._regret * self._n_steps_since_last_log
498        self._normalized_cumulative_regret += (
499            self._normalized_regret * self._n_steps_since_last_log
500        )
501        self._cumulative_expected_reward_agent += (
502            self._agent_average_reward * self._n_steps_since_last_log
503        )
504
505    @property
506    def _agent_average_reward(self):
507        if self._episodic:
508            return self._episodic_agent_average_reward / self._mdp.H
509        return self._agent_continuous_average_reward
510
511    def _compute_continuous_regret(self):
512        if not self._is_training:
513            if self._cached_continuous_regrets is None:
514                self._cached_continuous_regrets = self._get_continuous_regrets()
515            self._regret, self._normalized_regret = self._cached_continuous_regrets
516        else:
517            self._regret, self._normalized_regret = self._get_continuous_regrets()
518
519    def _get_continuous_regrets(self):
520        self._agent_continuous_average_reward = get_average_reward(
521            self._mdp.T,
522            self._mdp.R,
523            self._agent.current_optimal_stochastic_policy,
524            [(self._mdp.node_to_index[self._mdp.cur_node], 1.0)],
525        )
526
527        r = self._mdp.optimal_average_reward - self._agent_continuous_average_reward
528        if np.isclose(r, 0.0, atol=1e-3):
529            r = 0.0
530        if r < 0:
531            r = 0
532        nr = r / (self._mdp.optimal_average_reward - self._mdp.worst_average_reward)
533        return r, nr
534
535    def _compute_episodic_regret(self):
536        if not self._is_training:
537            # If the agent is not training, the policy will not change we can cache and reuse the regret for each given
538            # starting state.
539            if self._cached_episodic_regrets is None:
540                Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero(
541                    self._mdp.H,
542                    self._mdp.T,
543                    self._mdp.R,
544                    self._agent.current_optimal_stochastic_policy,
545                    self._mdp.starting_state_distribution,
546                    self._mdp.optimal_value_functions[1],
547                )
548                self._episodic_agent_average_reward = epi_agent_ar
549                self._cached_episodic_regrets = {
550                    n: (
551                        Rs[self._mdp.node_to_index[n]] / self._mdp.H,  # expected regret
552                        Rs[self._mdp.node_to_index[n]]  # normalized expected regret
553                        / self._mdp.get_minimal_regret_for_starting_node(n),
554                    )
555                    for n in self._mdp.starting_nodes
556                }
557            self._regret, self._normalized_regret = self._cached_episodic_regrets[
558                self._mdp.last_starting_node
559            ]
560        else:
561            Rs, epi_agent_ar = get_episodic_regrets_and_average_reward_at_time_zero(
562                self._mdp.H,
563                self._mdp.T,
564                self._mdp.R,
565                self._agent.current_optimal_stochastic_policy,
566                self._mdp.starting_state_distribution,
567                self._mdp.optimal_value_functions[1],
568            )
569            self._episodic_agent_average_reward = epi_agent_ar
570            self._regret = (
571                Rs[self._mdp.node_to_index[self._mdp.last_starting_node]] / self._mdp.H
572            )
573            self._normalized_regret = (
574                self._regret
575                / self._mdp.get_minimal_regret_for_starting_node(
576                    self._mdp.last_starting_node
577                )
578                * self._mdp.H
579            )
580
581    def _is_policy_optimal(self) -> bool:
582        if (
583            len(self._latest_expected_regrets)
584            == self._n_steps_to_check_for_agent_optimality
585            and np.isclose(
586                0,
587                self._latest_expected_regrets,
588                atol=1e-4 if self._mdp.is_episodic() else 1e-5,
589            ).all()
590        ):
591            # After we get an empirical suggestions that the policy may be optimal, we check if the expected regret is
592            # zero as well
593            self._compute_regrets()
594            return np.isclose(self._normalized_regret, 0).all()
595        return False
596
597    def _set_loop(self, T: int) -> Iterable:
598        """
599        creates a loop lasting for T steps taking into account the verbosity level.
600        """
601        if config.VERBOSE_LEVEL != 0:
602            desc = f"Experiment loop {type(self._agent).__name__}@{type(self._mdp).__name__}"
603            if type(config.VERBOSE_LEVEL) == str:
604                self.s = io.StringIO()  # we need this reference
605                self._loop = trange(T, desc=desc, file=self.s, mininterval=5)
606            else:
607                self._loop = trange(T, desc=desc, mininterval=5)
608            self._verbose = True
609        else:
610            self._loop = range(T)
611
612    def _update_user_loggings(self, t: int):
613        if self._verbose:  # and time() - self._verbose_time > 5:
614            self._verbose_postfix["Instantaneous normalized regret"] = np.round(
615                self._normalized_regret / t, 8
616            )
617            self._loop.set_postfix(self._verbose_postfix, refresh=False)
618
619    def plot(
620        self,
621        indicator: str = "cumulative_regret",
622        ax=None,
623        baselines=("random", "worst", "optimal"),
624        label=None,
625    ):
626        """
627        plots the values of the indicator obtained by the agent during the interactions along with the baseline values.
628
629        Parameters
630        ----------
631        indicator : str
632            The code name of the performance indicator that will be shown in the plot. Check `MDPLoop.get_indicators()`
633            to get a list of the available indicators. By default, the 'cumulative_regret' is shown.
634        ax : plt.Axes
635            The ax object where the plot will be put. By default, a new axis is created.
636        baselines : List[str]
637            The baselines to be included in the plot. Check `MDPLoop.get_baselines()` to get a list of the available
638            baselines. By default, all baselines are shown.
639        label : str
640            The label to be given to the agent. By default, a cleaned version of the agent class name is used.
641        """
642
643        show = ax is None
644        if ax is None:
645            fig, ax = plt.subplots()
646
647        assert indicator in self.get_indicators(), (
648            f"{indicator} is not an indicator. The indicators available are: "
649            + ",".join(self.get_indicators())
650            + "."
651        )
652
653        df_e = pd.DataFrame(self.logger.data)
654        time_steps = [0] + df_e.loc[:, "steps"].tolist()
655        ax.plot(
656            time_steps[1:] if indicator == "steps_per_second" else time_steps,
657            ([] if indicator == "steps_per_second" else [0])
658            + df_e.loc[:, indicator].tolist(),
659            label=clear_agent_mdp_class_name(type(self._agent).__name__)
660            if label is None
661            else label,
662        )
663        ax.set_ylabel(indicator.replace("_", " ").capitalize())
664
665        for b in baselines:
666            indicator = indicator.replace(
667                "cumulative_reward", "cumulative_expected_reward"
668            )
669            if b + "_" + indicator in self.get_baseline_indicators():
670                ax.plot(
671                    time_steps,
672                    [0] + df_e.loc[:, b + "_" + indicator].tolist(),
673                    label=b.capitalize(),
674                    # alpha=0.9,
675                    linestyle=(0, (5, 10)),
676                    color="darkolivegreen"
677                    if "optimal" in b
678                    else ("darkred" if "worst" in b else "darkslategray"),
679                    linewidth=2,
680                )
681
682        ax.set_xlabel("time step")
683        ax.legend()
684        if show:
685            plt.tight_layout()
686            plt.show()

The MDPLoop is the object in charge of the agent/MDP interactions and the computation of the performance indicators. It also provides limited plotting functionalities.

MDPLoop( mdp: Union[colosseum.mdp.base.BaseMDP, colosseum.mdp.base_finite.EpisodicMDP, colosseum.mdp.base_infinite.ContinuousMDP], agent: colosseum.agent.agents.base.BaseAgent, logger: colosseum.utils.acme.base_logger.Logger = None, n_log_intervals_to_check_for_agent_optimality: int = 10, enforce_time_constraint: bool = True)
108    def __init__(
109        self,
110        mdp: Union["BaseMDP", "EpisodicMDP", "ContinuousMDP"],
111        agent: "BaseAgent",
112        logger: Logger = None,
113        n_log_intervals_to_check_for_agent_optimality: int = 10,
114        enforce_time_constraint: bool = True,
115    ) -> object:
116        """
117        Parameters
118        ----------
119        mdp: Union["EpisodicMDP", "ContinuousMDP"]
120            The MDP.
121        agent : BaseAgent
122            The agent.
123        logger : Logger
124            The logger where the results of the interaction between the agent and the MDP are stored. By default, the
125            `InMemoryLogger` is used.
126        n_log_intervals_to_check_for_agent_optimality : int
127            The length of the interval between check is the policy has reached optimality. By default, the check happens
128            every ten interactions.
129        enforce_time_constraint : bool
130            If True, the computational time constraint given in the `run` function is enforced through multithreading.
131            By default, it is enforced.
132        """
133
134        if logger is None:
135            logger = InMemoryLogger()
136
137        self.logger = logger
138        self._enforce_time_constraint = enforce_time_constraint
139        self._mdp = mdp
140        self._agent = agent
141        self._episodic = self._mdp.is_episodic()
142        self._n_steps_to_check_for_agent_optimality = (
143            n_log_intervals_to_check_for_agent_optimality
144        )
145        assert self._episodic == agent.is_episodic()
146        assert self._agent.is_emission_map_accepted(
147            Tabular if self._mdp.emission_map is None else self._mdp.emission_map
148        )
149        self.actions_sequence = []
Parameters
  • mdp (Union["EpisodicMDP", "ContinuousMDP"]): The MDP.
  • agent (BaseAgent): The agent.
  • logger (Logger): The logger where the results of the interaction between the agent and the MDP are stored. By default, the InMemoryLogger is used.
  • n_log_intervals_to_check_for_agent_optimality (int): The length of the interval between check is the policy has reached optimality. By default, the check happens every ten interactions.
  • enforce_time_constraint (bool): If True, the computational time constraint given in the run function is enforced through multithreading. By default, it is enforced.
@staticmethod
def get_indicators() -> List[str]:
39    @staticmethod
40    def get_indicators() -> List[str]:
41        """
42        Returns
43        -------
44        List[str]
45            The code names for the indicators that are computed by the MDPLoop.
46        """
47        return [
48            "cumulative_expected_reward",
49            "cumulative_regret",
50            "cumulative_reward",
51            "normalized_cumulative_expected_reward",
52            "normalized_cumulative_regret",
53            "normalized_cumulative_reward",
54            "steps_per_second",
55        ]
Returns
  • List[str]: The code names for the indicators that are computed by the MDPLoop.
@staticmethod
def get_baseline_indicators() -> List[str]:
57    @staticmethod
58    def get_baseline_indicators() -> List[str]:
59        """
60        Returns
61        -------
62        List[str]
63            The code names for the baseline indicators that are computed by the MDPLoop.
64        """
65        return [
66            "random_cumulative_regret",
67            "random_cumulative_expected_reward",
68            "random_normalized_cumulative_regret",
69            "random_normalized_cumulative_expected_reward",
70            "optimal_cumulative_expected_reward",
71            "optimal_normalized_cumulative_expected_reward",
72            "worst_cumulative_regret",
73            "worst_cumulative_expected_reward",
74            "worst_normalized_cumulative_regret",
75            "worst_normalized_cumulative_expected_reward",
76        ]
Returns
  • List[str]: The code names for the baseline indicators that are computed by the MDPLoop.
@staticmethod
def get_baselines() -> Set[str]:
78    @staticmethod
79    def get_baselines() -> Set[str]:
80        """
81        Returns
82        -------
83        Set[str]
84            The baselines available for comparison.
85        """
86        return set(b[: b.find("_")] for b in MDPLoop.get_baseline_indicators())
Returns
  • Set[str]: The baselines available for comparison.
@staticmethod
def get_baselines_color_dict() -> Dict[str, str]:
88    @staticmethod
89    def get_baselines_color_dict() -> Dict[str, str]:
90        """
91        Returns
92        -------
93        Dict[str, str]
94            The color associated by default to the baselines.
95        """
96        return dict(random="black", worst="crimson", optimal="gold")
Returns
  • Dict[str, str]: The color associated by default to the baselines.
@staticmethod
def get_baselines_style_dict():
 98    @staticmethod
 99    def get_baselines_style_dict():
100        """
101        Returns
102        -------
103        Dict[str, str]
104            The line style associated by default to the baselines.
105        """
106        return dict(random=(0, (6, 12)), worst=(9, (6, 12)), optimal=(0, (6, 12)))
Returns
  • Dict[str, str]: The line style associated by default to the baselines.
remaining_time: float
Returns
  • float: The remaining computational time for training the agent.
def run( self, T: int, log_every: int = -1, max_time: float = inf) -> Tuple[int, Dict[str, float]]:
180    def run(
181        self,
182        T: int,
183        log_every: int = -1,
184        max_time: float = np.inf,
185    ) -> Tuple[int, Dict[str, float]]:
186        """
187        runs the agent/MDP interactions.
188
189        Parameters
190        ----------
191        T : int
192            The number of total interactions between the agent and the MDP.
193        log_every : int
194            The number of time steps after which performance indicators are calculated. By default, it does not calculate
195            them at any time except at the last one.
196        max_time : float
197            The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted.
198            By default, the maximum given time is infinite.
199
200        Returns
201        ----------
202        int
203            The time step at which the training has been interrupted due to the time constraint. If the constraint has
204            been respected it returns -1.
205        Dict[str, float]
206            The performance indicators computed at the end of the interactions.
207        """
208
209        if max_time == np.inf:
210            enforce_time_constraint = False
211        else:
212            enforce_time_constraint = self._enforce_time_constraint
213
214        assert (
215            type(log_every) == int
216        ), f"The log_every variable should be an integer, received value: {log_every}."
217        log_every = -1 if log_every == 0 else log_every
218
219        # Reset the visitation count of the MDP
220        self._mdp.reset_visitation_counts()
221
222        self._reset_run_variables()
223        self._max_time = max_time
224
225        ts = self._mdp.reset()
226        first_before_new_episode_timer = time()
227        if enforce_time_constraint and self.remaining_time < np.inf:
228            self._limit_update_time(0, self._agent.before_start_interacting)
229        else:
230            self._agent.before_start_interacting()
231        if config._DEBUG_LEVEL > 0:
232            if self._is_training:
233                do = f"before_start_interacting completed in {time() - first_before_new_episode_timer:.2f}."
234            else:
235                do = "before_start_interacting exceeded the time limit."
236            process_debug_output(do)
237
238        self._set_loop(T)
239        for t in self._loop:
240            if self._is_training and self.remaining_time < 0.5:
241                self._limit_exceeded(t)
242
243            # MDP step
244            h = self._mdp.h
245            action = self._agent.select_action(ts, h)
246            new_ts = self._mdp.step(action)
247            self.actions_sequence.append(new_ts.reward)
248
249            # Single step agent update
250            if self._is_training:
251                if enforce_time_constraint and self.remaining_time < np.inf:
252                    self._limit_update_time(
253                        t,
254                        lambda: self._agent.step_update(ts, action, new_ts, h),
255                    )
256                else:
257                    self._agent.step_update(ts, action, new_ts, h)
258
259            # End of episode agent update
260            if self._is_training and self._agent.is_episode_end(ts, action, new_ts, h):
261                if enforce_time_constraint and self.remaining_time < np.inf:
262                    self._limit_update_time(t, self._agent.episode_end_update)
263                else:
264                    self._agent.episode_end_update()
265
266            if t > 0 and log_every > 0 and t % log_every == 0:
267                # Log the performance of the agent
268                self._update_performance_logs(t)
269                self._n_steps_since_last_log = 0
270
271                # User defined custom log
272                self._agent.agent_logs()
273
274                # Verbose loggings
275                self._update_user_loggings(t)
276
277                # Storing the latest regrets
278                self._latest_expected_regrets.append(self._normalized_regret)
279                if (
280                    len(self._latest_expected_regrets)
281                    > self._n_steps_to_check_for_agent_optimality
282                ):
283                    self._latest_expected_regrets.pop(0)
284
285                # Stop training if the agent has confidently reached the optimal policy
286                if self._is_training and t > 0.2 * T and self._is_policy_optimal():
287                    if type(self._loop) == tqdm.std.tqdm:
288                        self._verbose_postfix["is_training"] = f"No, optimal at {t}"
289                    self._is_training = False
290
291            self._n_steps_since_last_log += 1
292            self._cumulative_reward += new_ts.reward
293            ts = new_ts
294
295            # Resetting episodic MDPs
296            if self._mdp.is_episodic() and new_ts.last():
297                assert self._mdp.necessary_reset or t == T - 2
298                ts = self._mdp.reset()
299                self._n_episodes += 1
300
301        self._update_performance_logs(t)
302        self.logger.close()
303        return self._last_training_step, self._last_logs

runs the agent/MDP interactions.

Parameters
  • T (int): The number of total interactions between the agent and the MDP.
  • log_every (int): The number of time steps after which performance indicators are calculated. By default, it does not calculate them at any time except at the last one.
  • max_time (float): The maximum number of seconds the interactions can take. If it is surpassed then the loop is interrupted. By default, the maximum given time is infinite.
Returns
  • int: The time step at which the training has been interrupted due to the time constraint. If the constraint has been respected it returns -1.
  • Dict[str, float]: The performance indicators computed at the end of the interactions.
def plot( self, indicator: str = 'cumulative_regret', ax=None, baselines=('random', 'worst', 'optimal'), label=None):
619    def plot(
620        self,
621        indicator: str = "cumulative_regret",
622        ax=None,
623        baselines=("random", "worst", "optimal"),
624        label=None,
625    ):
626        """
627        plots the values of the indicator obtained by the agent during the interactions along with the baseline values.
628
629        Parameters
630        ----------
631        indicator : str
632            The code name of the performance indicator that will be shown in the plot. Check `MDPLoop.get_indicators()`
633            to get a list of the available indicators. By default, the 'cumulative_regret' is shown.
634        ax : plt.Axes
635            The ax object where the plot will be put. By default, a new axis is created.
636        baselines : List[str]
637            The baselines to be included in the plot. Check `MDPLoop.get_baselines()` to get a list of the available
638            baselines. By default, all baselines are shown.
639        label : str
640            The label to be given to the agent. By default, a cleaned version of the agent class name is used.
641        """
642
643        show = ax is None
644        if ax is None:
645            fig, ax = plt.subplots()
646
647        assert indicator in self.get_indicators(), (
648            f"{indicator} is not an indicator. The indicators available are: "
649            + ",".join(self.get_indicators())
650            + "."
651        )
652
653        df_e = pd.DataFrame(self.logger.data)
654        time_steps = [0] + df_e.loc[:, "steps"].tolist()
655        ax.plot(
656            time_steps[1:] if indicator == "steps_per_second" else time_steps,
657            ([] if indicator == "steps_per_second" else [0])
658            + df_e.loc[:, indicator].tolist(),
659            label=clear_agent_mdp_class_name(type(self._agent).__name__)
660            if label is None
661            else label,
662        )
663        ax.set_ylabel(indicator.replace("_", " ").capitalize())
664
665        for b in baselines:
666            indicator = indicator.replace(
667                "cumulative_reward", "cumulative_expected_reward"
668            )
669            if b + "_" + indicator in self.get_baseline_indicators():
670                ax.plot(
671                    time_steps,
672                    [0] + df_e.loc[:, b + "_" + indicator].tolist(),
673                    label=b.capitalize(),
674                    # alpha=0.9,
675                    linestyle=(0, (5, 10)),
676                    color="darkolivegreen"
677                    if "optimal" in b
678                    else ("darkred" if "worst" in b else "darkslategray"),
679                    linewidth=2,
680                )
681
682        ax.set_xlabel("time step")
683        ax.legend()
684        if show:
685            plt.tight_layout()
686            plt.show()

plots the values of the indicator obtained by the agent during the interactions along with the baseline values.

Parameters
  • indicator (str): The code name of the performance indicator that will be shown in the plot. Check MDPLoop.get_indicators() to get a list of the available indicators. By default, the 'cumulative_regret' is shown.
  • ax (plt.Axes): The ax object where the plot will be put. By default, a new axis is created.
  • baselines (List[str]): The baselines to be included in the plot. Check MDPLoop.get_baselines() to get a list of the available baselines. By default, all baselines are shown.
  • label (str): The label to be given to the agent. By default, a cleaned version of the agent class name is used.