colosseum.agent.agents.infinite_horizon.ucrl2

  1import math
  2import os
  3from typing import TYPE_CHECKING, Any, Callable, Dict, Union
  4
  5import dm_env
  6import gin
  7import numpy as np
  8from ray import tune
  9
 10from colosseum.agent.actors import QValuesActor
 11from colosseum.agent.agents.base import BaseAgent
 12from colosseum.dynamic_programming import discounted_value_iteration
 13from colosseum.dynamic_programming.infinite_horizon import extended_value_iteration
 14from colosseum.dynamic_programming.utils import get_policy_from_q_values
 15from colosseum.emission_maps import EmissionMap
 16
 17if TYPE_CHECKING:
 18    from colosseum.mdp import ACTION_TYPE
 19    from colosseum.utils.acme.specs import MDPSpec
 20
 21
 22def _chernoff(it, N, delta, sqrt_C, log_C, range=1.0):
 23    ci = range * np.sqrt(sqrt_C * math.log(log_C * (it + 1) / delta) / np.maximum(1, N))
 24    return ci
 25
 26
 27def bernstein(scale_a, log_scale_a, scale_b, log_scale_b, alpha_1, alpha_2):
 28    A = scale_a * math.log(log_scale_a)
 29    B = scale_b * math.log(log_scale_b)
 30    return alpha_1 * np.sqrt(A) + alpha_2 * B
 31
 32
 33@gin.configurable
 34class UCRL2Continuous(BaseAgent):
 35    """
 36    The second version of upper confidence for reinforcement learning algorithm.
 37
 38    Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in
 39    neural information processing systems 21 (2008).
 40
 41    Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality."
 42    arXiv preprint arXiv:2007.05456 (2020).
 43    """
 44
 45    @staticmethod
 46    def is_emission_map_accepted(emission_map: "EmissionMap") -> bool:
 47        return emission_map.is_tabular
 48
 49    @staticmethod
 50    def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0):
 51        string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n"
 52        for k, v in parameters.items():
 53            string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n"
 54        return string[:-1]
 55
 56    @staticmethod
 57    def is_episodic() -> bool:
 58        return False
 59
 60    @staticmethod
 61    def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]:
 62        return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)}
 63
 64    @staticmethod
 65    def get_agent_instance_from_parameters(
 66        seed: int,
 67        optimization_horizon: int,
 68        mdp_specs: "MDPSpec",
 69        parameters: Dict[str, Any],
 70    ) -> "BaseAgent":
 71        return UCRL2Continuous(
 72            mdp_specs=mdp_specs,
 73            seed=seed,
 74            optimization_horizon=optimization_horizon,
 75            alpha_p=parameters["alpha_p"],
 76            alpha_r=parameters["alpha_r"],
 77            bound_type_p="bernstein",
 78        )
 79
 80    @property
 81    def current_optimal_stochastic_policy(self) -> np.ndarray:
 82        Q, _ = discounted_value_iteration(self.P, self.estimated_rewards)
 83        return get_policy_from_q_values(Q, True)
 84
 85    def __init__(
 86        self,
 87        seed: int,
 88        mdp_specs: "MDPSpec",
 89        optimization_horizon: int,
 90        # MDP model parameters
 91        alpha_r=1.0,
 92        alpha_p=1.0,
 93        bound_type_p="_chernoff",
 94        bound_type_rew="_chernoff",
 95        # Actor parameters
 96        epsilon_greedy: Union[float, Callable] = None,
 97        boltzmann_temperature: Union[float, Callable] = None,
 98    ):
 99        r"""
100        Parameters
101        ----------
102        seed : int
103            The random seed.
104        mdp_specs : MDPSpec
105            The full specification of the MDP.
106        optimization_horizon : int
107            The total number of interactions that the agent is expected to have with the MDP.
108        alpha_r : float
109            The :math:`\alpha` parameter for the rewards. By default, it is set to one.
110        alpha_p : float
111            The :math:`\alpha` parameter for the transitions. By default, it is set to one.
112        bound_type_p : str
113            The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default,
114            it is set to '_chernoff'.
115        bound_type_rew : str
116            The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default,
117            it is set to '_chernoff'.
118        epsilon_greedy : Union[float, Callable], optional
119            The probability of selecting an action at random. It can be provided as a float or as a function of the
120            total number of interactions. By default, the probability is set to zero.
121        boltzmann_temperature : Union[float, Callable], optional
122            The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of
123            the total number of interactions. By default, Boltzmann exploration is disabled.
124        """
125
126        n_states = self._n_states = mdp_specs.observations.num_values
127        n_actions = self._n_actions = mdp_specs.actions.num_values
128        self.reward_range = mdp_specs.rewards_range
129
130        assert bound_type_p in ["_chernoff", "bernstein"]
131        assert bound_type_rew in ["_chernoff", "bernstein"]
132
133        self.alpha_p = alpha_p
134        self.alpha_r = alpha_r
135
136        # initialize matrices
137        self.policy = np.zeros((n_states,), dtype=np.int_)
138        self.policy_indices = np.zeros((n_states,), dtype=np.int_)
139
140        # initialization
141        self.iteration = 0
142        self.episode = 0
143        self.delta = 1.0  # confidence
144        self.bound_type_p = bound_type_p
145        self.bound_type_rew = bound_type_rew
146
147        self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states
148
149        self.estimated_rewards = (
150            np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1]
151        )
152        self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32)
153        self.estimated_holding_times = np.ones((n_states, n_actions), np.float32)
154        self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32)
155
156        self.current_state = None
157        self.artificial_episode = 0
158        self.episode_reward_data = dict()
159        self.episode_transition_data = dict()
160
161        super(UCRL2Continuous, self).__init__(
162            seed,
163            mdp_specs,
164            None,
165            QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature),
166            optimization_horizon,
167        )
168
169    def is_episode_end(
170        self,
171        ts_t: dm_env.TimeStep,
172        a_t: "ACTION_TYPE",
173        ts_tp1: dm_env.TimeStep,
174        time: int,
175    ) -> bool:
176        nu_k = len(self.episode_transition_data[ts_t.observation, a_t])
177        return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k)
178
179    def episode_end_update(self):
180        self.episode += 1
181        self.delta = 1 / math.sqrt(self.iteration + 1)
182
183        new_sp = self.solve_optimistic_model()
184        if new_sp is not None:
185            self.span_value = new_sp / self.reward_range[1]
186
187        if len(self.episode_transition_data) > 0:
188            self.model_update()
189            self.episode_reward_data = dict()
190            self.episode_transition_data = dict()
191
192    def before_start_interacting(self):
193        self.episode_end_update()
194
195    def step_update(
196        self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int
197    ):
198        self.N[ts_t.observation, a_t, ts_tp1.observation] += 1
199
200        if (ts_t.observation, a_t) in self.episode_reward_data:
201            self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward)
202            if not ts_tp1.last():
203                self.episode_transition_data[ts_t.observation, a_t].append(
204                    ts_tp1.observation
205                )
206        else:
207            self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward]
208            if not ts_tp1.last():
209                self.episode_transition_data[ts_t.observation, a_t] = [
210                    ts_tp1.observation
211                ]
212
213    def model_update(self):
214        """
215        updates the model given the transitions obtained during the artificial episode.
216        """
217        for (s_tm1, action), r_ts in self.episode_reward_data.items():
218            # updated observations
219            scale_f = self.N[s_tm1, action].sum()
220            for r in r_ts:
221                # update the number of total iterations
222                self.iteration += 1
223
224                # update reward and variance estimate
225                scale_f += 1
226                old_estimated_reward = self.estimated_rewards[s_tm1, action]
227                self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0)
228                self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0)
229                self.variance_proxy_reward[s_tm1, action] += (
230                    r - old_estimated_reward
231                ) * (r - self.estimated_rewards[s_tm1, action])
232
233                # update holding time
234                self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0)
235                self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1)
236
237        for (s_tm1, action) in set(self.episode_transition_data.keys()):
238            self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum()
239
240    def beta_r(self, nb_observations) -> np.ndarray:
241        """
242        calculates the confidence bounds on the reward.
243        Returns
244        -------
245        np.array
246            The vector of confidence bounds on the reward function (|S| x |A|)
247        """
248        S = self._n_states
249        A = self._n_actions
250        if self.bound_type_rew != "bernstein":
251            ci = _chernoff(
252                it=self.iteration,
253                N=nb_observations,
254                range=self.reward_range[1],
255                delta=self.delta,
256                sqrt_C=3.5,
257                log_C=2 * S * A,
258            )
259            return self.alpha_r * ci
260        else:
261            N = np.maximum(1, nb_observations)
262            Nm1 = np.maximum(1, nb_observations - 1)
263            var_r = self.variance_proxy_reward / Nm1
264            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
265            beta = bernstein(
266                scale_a=14 * var_r / N,
267                log_scale_a=log_value,
268                scale_b=49.0 * self.r_max / (3.0 * Nm1),
269                log_scale_b=log_value,
270                alpha_1=math.sqrt(self.alpha_r),
271                alpha_2=self.alpha_r,
272            )
273            return beta
274
275    def beta_p(self, nb_observations) -> np.ndarray:
276        """
277        calculates the confidence bounds on the transition probabilities.
278        Returns
279        -------
280        np.array
281            The vector of confidence bounds on the reward function (|S| x |A|)
282        """
283        S = self._n_states
284        A = self._n_actions
285        if self.bound_type_p != "bernstein":
286            beta = _chernoff(
287                it=self.iteration,
288                N=nb_observations,
289                range=1.0,
290                delta=self.delta,
291                sqrt_C=14 * S,
292                log_C=2 * A,
293            )
294            return self.alpha_p * beta.reshape([S, A, 1])
295        else:
296            N = np.maximum(1, nb_observations)
297            Nm1 = np.maximum(1, nb_observations - 1)
298            var_p = self.P * (1.0 - self.P)
299            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
300            beta = bernstein(
301                scale_a=14 * var_p / N[:, :, np.newaxis],
302                log_scale_a=log_value,
303                scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]),
304                log_scale_b=log_value,
305                alpha_1=math.sqrt(self.alpha_p),
306                alpha_2=self.alpha_p,
307            )
308            return beta
309
310    def solve_optimistic_model(self) -> Union[None, float]:
311        """
312        solves the optimistic value iteration.
313        Returns
314        -------
315        Union[None, float]
316            The span value of the estimates from the optimistic value iteration or None if no solution has been found.
317        """
318        nb_observations = self.N.sum(-1)
319
320        beta_r = self.beta_r(nb_observations)  # confidence bounds on rewards
321        beta_p = self.beta_p(
322            nb_observations
323        )  # confidence bounds on transition probabilities
324
325        T = self.P
326        estimated_rewards = self.estimated_rewards
327
328        assert np.isclose(T.sum(-1), 1.0).all()
329        try:
330            res = extended_value_iteration(
331                T, estimated_rewards, beta_r, beta_p, self.reward_range[1]
332            )
333        except SystemError:
334            # Debug logs if the optimistic value iteration fails
335            os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True)
336            for i in range(100):
337                if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"):
338                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T)
339                    np.save(
340                        f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy",
341                        estimated_rewards,
342                    )
343                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r)
344                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p)
345                    break
346            res = None
347
348        if res is not None:
349            span_value_new, self.Q, self.V = res
350            span_value = span_value_new
351            self._actor.set_q_values(self.Q)
352
353            assert span_value >= 0, "The span value cannot be lower than zero"
354            assert np.abs(span_value - span_value_new) < 1e-8
355
356            return span_value
357        return None
def bernstein(scale_a, log_scale_a, scale_b, log_scale_b, alpha_1, alpha_2):
28def bernstein(scale_a, log_scale_a, scale_b, log_scale_b, alpha_1, alpha_2):
29    A = scale_a * math.log(log_scale_a)
30    B = scale_b * math.log(log_scale_b)
31    return alpha_1 * np.sqrt(A) + alpha_2 * B
@gin.configurable
class UCRL2Continuous(colosseum.agent.agents.base.BaseAgent):
 34@gin.configurable
 35class UCRL2Continuous(BaseAgent):
 36    """
 37    The second version of upper confidence for reinforcement learning algorithm.
 38
 39    Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in
 40    neural information processing systems 21 (2008).
 41
 42    Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality."
 43    arXiv preprint arXiv:2007.05456 (2020).
 44    """
 45
 46    @staticmethod
 47    def is_emission_map_accepted(emission_map: "EmissionMap") -> bool:
 48        return emission_map.is_tabular
 49
 50    @staticmethod
 51    def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0):
 52        string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n"
 53        for k, v in parameters.items():
 54            string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n"
 55        return string[:-1]
 56
 57    @staticmethod
 58    def is_episodic() -> bool:
 59        return False
 60
 61    @staticmethod
 62    def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]:
 63        return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)}
 64
 65    @staticmethod
 66    def get_agent_instance_from_parameters(
 67        seed: int,
 68        optimization_horizon: int,
 69        mdp_specs: "MDPSpec",
 70        parameters: Dict[str, Any],
 71    ) -> "BaseAgent":
 72        return UCRL2Continuous(
 73            mdp_specs=mdp_specs,
 74            seed=seed,
 75            optimization_horizon=optimization_horizon,
 76            alpha_p=parameters["alpha_p"],
 77            alpha_r=parameters["alpha_r"],
 78            bound_type_p="bernstein",
 79        )
 80
 81    @property
 82    def current_optimal_stochastic_policy(self) -> np.ndarray:
 83        Q, _ = discounted_value_iteration(self.P, self.estimated_rewards)
 84        return get_policy_from_q_values(Q, True)
 85
 86    def __init__(
 87        self,
 88        seed: int,
 89        mdp_specs: "MDPSpec",
 90        optimization_horizon: int,
 91        # MDP model parameters
 92        alpha_r=1.0,
 93        alpha_p=1.0,
 94        bound_type_p="_chernoff",
 95        bound_type_rew="_chernoff",
 96        # Actor parameters
 97        epsilon_greedy: Union[float, Callable] = None,
 98        boltzmann_temperature: Union[float, Callable] = None,
 99    ):
100        r"""
101        Parameters
102        ----------
103        seed : int
104            The random seed.
105        mdp_specs : MDPSpec
106            The full specification of the MDP.
107        optimization_horizon : int
108            The total number of interactions that the agent is expected to have with the MDP.
109        alpha_r : float
110            The :math:`\alpha` parameter for the rewards. By default, it is set to one.
111        alpha_p : float
112            The :math:`\alpha` parameter for the transitions. By default, it is set to one.
113        bound_type_p : str
114            The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default,
115            it is set to '_chernoff'.
116        bound_type_rew : str
117            The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default,
118            it is set to '_chernoff'.
119        epsilon_greedy : Union[float, Callable], optional
120            The probability of selecting an action at random. It can be provided as a float or as a function of the
121            total number of interactions. By default, the probability is set to zero.
122        boltzmann_temperature : Union[float, Callable], optional
123            The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of
124            the total number of interactions. By default, Boltzmann exploration is disabled.
125        """
126
127        n_states = self._n_states = mdp_specs.observations.num_values
128        n_actions = self._n_actions = mdp_specs.actions.num_values
129        self.reward_range = mdp_specs.rewards_range
130
131        assert bound_type_p in ["_chernoff", "bernstein"]
132        assert bound_type_rew in ["_chernoff", "bernstein"]
133
134        self.alpha_p = alpha_p
135        self.alpha_r = alpha_r
136
137        # initialize matrices
138        self.policy = np.zeros((n_states,), dtype=np.int_)
139        self.policy_indices = np.zeros((n_states,), dtype=np.int_)
140
141        # initialization
142        self.iteration = 0
143        self.episode = 0
144        self.delta = 1.0  # confidence
145        self.bound_type_p = bound_type_p
146        self.bound_type_rew = bound_type_rew
147
148        self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states
149
150        self.estimated_rewards = (
151            np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1]
152        )
153        self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32)
154        self.estimated_holding_times = np.ones((n_states, n_actions), np.float32)
155        self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32)
156
157        self.current_state = None
158        self.artificial_episode = 0
159        self.episode_reward_data = dict()
160        self.episode_transition_data = dict()
161
162        super(UCRL2Continuous, self).__init__(
163            seed,
164            mdp_specs,
165            None,
166            QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature),
167            optimization_horizon,
168        )
169
170    def is_episode_end(
171        self,
172        ts_t: dm_env.TimeStep,
173        a_t: "ACTION_TYPE",
174        ts_tp1: dm_env.TimeStep,
175        time: int,
176    ) -> bool:
177        nu_k = len(self.episode_transition_data[ts_t.observation, a_t])
178        return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k)
179
180    def episode_end_update(self):
181        self.episode += 1
182        self.delta = 1 / math.sqrt(self.iteration + 1)
183
184        new_sp = self.solve_optimistic_model()
185        if new_sp is not None:
186            self.span_value = new_sp / self.reward_range[1]
187
188        if len(self.episode_transition_data) > 0:
189            self.model_update()
190            self.episode_reward_data = dict()
191            self.episode_transition_data = dict()
192
193    def before_start_interacting(self):
194        self.episode_end_update()
195
196    def step_update(
197        self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int
198    ):
199        self.N[ts_t.observation, a_t, ts_tp1.observation] += 1
200
201        if (ts_t.observation, a_t) in self.episode_reward_data:
202            self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward)
203            if not ts_tp1.last():
204                self.episode_transition_data[ts_t.observation, a_t].append(
205                    ts_tp1.observation
206                )
207        else:
208            self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward]
209            if not ts_tp1.last():
210                self.episode_transition_data[ts_t.observation, a_t] = [
211                    ts_tp1.observation
212                ]
213
214    def model_update(self):
215        """
216        updates the model given the transitions obtained during the artificial episode.
217        """
218        for (s_tm1, action), r_ts in self.episode_reward_data.items():
219            # updated observations
220            scale_f = self.N[s_tm1, action].sum()
221            for r in r_ts:
222                # update the number of total iterations
223                self.iteration += 1
224
225                # update reward and variance estimate
226                scale_f += 1
227                old_estimated_reward = self.estimated_rewards[s_tm1, action]
228                self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0)
229                self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0)
230                self.variance_proxy_reward[s_tm1, action] += (
231                    r - old_estimated_reward
232                ) * (r - self.estimated_rewards[s_tm1, action])
233
234                # update holding time
235                self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0)
236                self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1)
237
238        for (s_tm1, action) in set(self.episode_transition_data.keys()):
239            self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum()
240
241    def beta_r(self, nb_observations) -> np.ndarray:
242        """
243        calculates the confidence bounds on the reward.
244        Returns
245        -------
246        np.array
247            The vector of confidence bounds on the reward function (|S| x |A|)
248        """
249        S = self._n_states
250        A = self._n_actions
251        if self.bound_type_rew != "bernstein":
252            ci = _chernoff(
253                it=self.iteration,
254                N=nb_observations,
255                range=self.reward_range[1],
256                delta=self.delta,
257                sqrt_C=3.5,
258                log_C=2 * S * A,
259            )
260            return self.alpha_r * ci
261        else:
262            N = np.maximum(1, nb_observations)
263            Nm1 = np.maximum(1, nb_observations - 1)
264            var_r = self.variance_proxy_reward / Nm1
265            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
266            beta = bernstein(
267                scale_a=14 * var_r / N,
268                log_scale_a=log_value,
269                scale_b=49.0 * self.r_max / (3.0 * Nm1),
270                log_scale_b=log_value,
271                alpha_1=math.sqrt(self.alpha_r),
272                alpha_2=self.alpha_r,
273            )
274            return beta
275
276    def beta_p(self, nb_observations) -> np.ndarray:
277        """
278        calculates the confidence bounds on the transition probabilities.
279        Returns
280        -------
281        np.array
282            The vector of confidence bounds on the reward function (|S| x |A|)
283        """
284        S = self._n_states
285        A = self._n_actions
286        if self.bound_type_p != "bernstein":
287            beta = _chernoff(
288                it=self.iteration,
289                N=nb_observations,
290                range=1.0,
291                delta=self.delta,
292                sqrt_C=14 * S,
293                log_C=2 * A,
294            )
295            return self.alpha_p * beta.reshape([S, A, 1])
296        else:
297            N = np.maximum(1, nb_observations)
298            Nm1 = np.maximum(1, nb_observations - 1)
299            var_p = self.P * (1.0 - self.P)
300            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
301            beta = bernstein(
302                scale_a=14 * var_p / N[:, :, np.newaxis],
303                log_scale_a=log_value,
304                scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]),
305                log_scale_b=log_value,
306                alpha_1=math.sqrt(self.alpha_p),
307                alpha_2=self.alpha_p,
308            )
309            return beta
310
311    def solve_optimistic_model(self) -> Union[None, float]:
312        """
313        solves the optimistic value iteration.
314        Returns
315        -------
316        Union[None, float]
317            The span value of the estimates from the optimistic value iteration or None if no solution has been found.
318        """
319        nb_observations = self.N.sum(-1)
320
321        beta_r = self.beta_r(nb_observations)  # confidence bounds on rewards
322        beta_p = self.beta_p(
323            nb_observations
324        )  # confidence bounds on transition probabilities
325
326        T = self.P
327        estimated_rewards = self.estimated_rewards
328
329        assert np.isclose(T.sum(-1), 1.0).all()
330        try:
331            res = extended_value_iteration(
332                T, estimated_rewards, beta_r, beta_p, self.reward_range[1]
333            )
334        except SystemError:
335            # Debug logs if the optimistic value iteration fails
336            os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True)
337            for i in range(100):
338                if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"):
339                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T)
340                    np.save(
341                        f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy",
342                        estimated_rewards,
343                    )
344                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r)
345                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p)
346                    break
347            res = None
348
349        if res is not None:
350            span_value_new, self.Q, self.V = res
351            span_value = span_value_new
352            self._actor.set_q_values(self.Q)
353
354            assert span_value >= 0, "The span value cannot be lower than zero"
355            assert np.abs(span_value - span_value_new) < 1e-8
356
357            return span_value
358        return None

The second version of upper confidence for reinforcement learning algorithm.

Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in neural information processing systems 21 (2008).

Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality." arXiv preprint arXiv:2007.05456 (2020).

UCRL2Continuous( seed: int, mdp_specs: colosseum.utils.acme.specs.MDPSpec, optimization_horizon: int, alpha_r=1.0, alpha_p=1.0, bound_type_p='_chernoff', bound_type_rew='_chernoff', epsilon_greedy: Union[float, Callable] = None, boltzmann_temperature: Union[float, Callable] = None)
 86    def __init__(
 87        self,
 88        seed: int,
 89        mdp_specs: "MDPSpec",
 90        optimization_horizon: int,
 91        # MDP model parameters
 92        alpha_r=1.0,
 93        alpha_p=1.0,
 94        bound_type_p="_chernoff",
 95        bound_type_rew="_chernoff",
 96        # Actor parameters
 97        epsilon_greedy: Union[float, Callable] = None,
 98        boltzmann_temperature: Union[float, Callable] = None,
 99    ):
100        r"""
101        Parameters
102        ----------
103        seed : int
104            The random seed.
105        mdp_specs : MDPSpec
106            The full specification of the MDP.
107        optimization_horizon : int
108            The total number of interactions that the agent is expected to have with the MDP.
109        alpha_r : float
110            The :math:`\alpha` parameter for the rewards. By default, it is set to one.
111        alpha_p : float
112            The :math:`\alpha` parameter for the transitions. By default, it is set to one.
113        bound_type_p : str
114            The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default,
115            it is set to '_chernoff'.
116        bound_type_rew : str
117            The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default,
118            it is set to '_chernoff'.
119        epsilon_greedy : Union[float, Callable], optional
120            The probability of selecting an action at random. It can be provided as a float or as a function of the
121            total number of interactions. By default, the probability is set to zero.
122        boltzmann_temperature : Union[float, Callable], optional
123            The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of
124            the total number of interactions. By default, Boltzmann exploration is disabled.
125        """
126
127        n_states = self._n_states = mdp_specs.observations.num_values
128        n_actions = self._n_actions = mdp_specs.actions.num_values
129        self.reward_range = mdp_specs.rewards_range
130
131        assert bound_type_p in ["_chernoff", "bernstein"]
132        assert bound_type_rew in ["_chernoff", "bernstein"]
133
134        self.alpha_p = alpha_p
135        self.alpha_r = alpha_r
136
137        # initialize matrices
138        self.policy = np.zeros((n_states,), dtype=np.int_)
139        self.policy_indices = np.zeros((n_states,), dtype=np.int_)
140
141        # initialization
142        self.iteration = 0
143        self.episode = 0
144        self.delta = 1.0  # confidence
145        self.bound_type_p = bound_type_p
146        self.bound_type_rew = bound_type_rew
147
148        self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states
149
150        self.estimated_rewards = (
151            np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1]
152        )
153        self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32)
154        self.estimated_holding_times = np.ones((n_states, n_actions), np.float32)
155        self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32)
156
157        self.current_state = None
158        self.artificial_episode = 0
159        self.episode_reward_data = dict()
160        self.episode_transition_data = dict()
161
162        super(UCRL2Continuous, self).__init__(
163            seed,
164            mdp_specs,
165            None,
166            QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature),
167            optimization_horizon,
168        )
Parameters
  • seed (int): The random seed.
  • mdp_specs (MDPSpec): The full specification of the MDP.
  • optimization_horizon (int): The total number of interactions that the agent is expected to have with the MDP.
  • alpha_r (float): The \( \alpha \) parameter for the rewards. By default, it is set to one.
  • alpha_p (float): The \( \alpha \) parameter for the transitions. By default, it is set to one.
  • bound_type_p (str): The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default, it is set to '_chernoff'.
  • bound_type_rew (str): The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default, it is set to '_chernoff'.
  • epsilon_greedy (Union[float, Callable], optional): The probability of selecting an action at random. It can be provided as a float or as a function of the total number of interactions. By default, the probability is set to zero.
  • boltzmann_temperature (Union[float, Callable], optional): The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of the total number of interactions. By default, Boltzmann exploration is disabled.
@staticmethod
def is_emission_map_accepted(emission_map: colosseum.emission_maps.base.EmissionMap) -> bool:
46    @staticmethod
47    def is_emission_map_accepted(emission_map: "EmissionMap") -> bool:
48        return emission_map.is_tabular
Returns
  • bool: True if the agent class accepts the emission map.
@staticmethod
def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0):
50    @staticmethod
51    def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0):
52        string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n"
53        for k, v in parameters.items():
54            string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n"
55        return string[:-1]

produces a string containing the gin config file corresponding to the parameters given in input.

Parameters
  • parameters (Dict[str, Any]): The dictionary containing the parameters of the agent.
  • index (int): The index assigned to the gin configuration.
Returns
  • gin_config (str): The gin configuration file.
@staticmethod
def is_episodic() -> bool:
57    @staticmethod
58    def is_episodic() -> bool:
59        return False
Returns
  • bool: True if the agent is suited for the episodic setting.
@staticmethod
def get_hyperparameters_search_spaces() -> Dict[str, ray.tune.sample.Domain]:
61    @staticmethod
62    def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]:
63        return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)}
Returns
  • Dict[str, tune.sample.Domain]: The dictionary with key value pairs corresponding to hyperparameter names and corresponding ray.tune samplers.
@staticmethod
def get_agent_instance_from_parameters( seed: int, optimization_horizon: int, mdp_specs: colosseum.utils.acme.specs.MDPSpec, parameters: Dict[str, Any]) -> colosseum.agent.agents.base.BaseAgent:
65    @staticmethod
66    def get_agent_instance_from_parameters(
67        seed: int,
68        optimization_horizon: int,
69        mdp_specs: "MDPSpec",
70        parameters: Dict[str, Any],
71    ) -> "BaseAgent":
72        return UCRL2Continuous(
73            mdp_specs=mdp_specs,
74            seed=seed,
75            optimization_horizon=optimization_horizon,
76            alpha_p=parameters["alpha_p"],
77            alpha_r=parameters["alpha_r"],
78            bound_type_p="bernstein",
79        )

returns an agent instance for the mdp specification and agent parameters given in input.

Parameters
  • seed (int): The random seed.
  • optimization_horizon (int): The total number of interactions that the agent is expected to have with the MDP.
  • mdp_specs (MDPSpec): The full specification of the MDP.
  • parameters (Dict[str, Any]): The dictionary containing the parameters of the agent.
Returns
  • BaseAgent: The agent instance.
current_optimal_stochastic_policy: numpy.ndarray
Returns
  • np.ndarray: The estimates of the best optimal policy given the current knowledge of the agent in the form of distribution over actions.
def is_episode_end( self, ts_t: dm_env._environment.TimeStep, a_t: Union[int, float, numpy.ndarray], ts_tp1: dm_env._environment.TimeStep, time: int) -> bool:
170    def is_episode_end(
171        self,
172        ts_t: dm_env.TimeStep,
173        a_t: "ACTION_TYPE",
174        ts_tp1: dm_env.TimeStep,
175        time: int,
176    ) -> bool:
177        nu_k = len(self.episode_transition_data[ts_t.observation, a_t])
178        return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k)

checks whether the episode is terminated. By default, this checks whether the current time step exceeds the time horizon. In the continuous case, this can be used to define artificial episodes.

Parameters
  • ts_t (dm_env.TimeStep): The TimeStep at time t.
  • a_t ("ACTION_TYPE"): The action taken by the agent at time t.
  • ts_tp1 (dm_env.TimeStep): The TimeStep at time t + 1.
  • time (int): The current time of the environment. In the episodic case, this refers to the in-episode time, whereas in the continuous case this refers to the total number of previous interactions.
Returns
  • bool: True if the episode terminated at time t+1.
def episode_end_update(self):
180    def episode_end_update(self):
181        self.episode += 1
182        self.delta = 1 / math.sqrt(self.iteration + 1)
183
184        new_sp = self.solve_optimistic_model()
185        if new_sp is not None:
186            self.span_value = new_sp / self.reward_range[1]
187
188        if len(self.episode_transition_data) > 0:
189            self.model_update()
190            self.episode_reward_data = dict()
191            self.episode_transition_data = dict()

is called when an episode ends. In the infinite horizon case, we refer to artificial episodes.

def before_start_interacting(self):
193    def before_start_interacting(self):
194        self.episode_end_update()

is called before the agent starts interacting with the MDP.

def step_update( self, ts_t: dm_env._environment.TimeStep, a_t: Union[int, float, numpy.ndarray], ts_tp1: dm_env._environment.TimeStep, h: int):
196    def step_update(
197        self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int
198    ):
199        self.N[ts_t.observation, a_t, ts_tp1.observation] += 1
200
201        if (ts_t.observation, a_t) in self.episode_reward_data:
202            self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward)
203            if not ts_tp1.last():
204                self.episode_transition_data[ts_t.observation, a_t].append(
205                    ts_tp1.observation
206                )
207        else:
208            self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward]
209            if not ts_tp1.last():
210                self.episode_transition_data[ts_t.observation, a_t] = [
211                    ts_tp1.observation
212                ]

adds the transition in input to the MDP model.

Parameters
  • ts_t (dm_env.TimeStep): The TimeStep at time t.
  • a_t ("ACTION_TYPE"): The action taken by the agent at time t.
  • ts_tp1 (dm_env.TimeStep): The TimeStep at time t + 1.
  • time (int): The current time of the environment. In the episodic case, this refers to the in-episode time, whereas in the continuous case this refers to the total number of previous interactions.
def model_update(self):
214    def model_update(self):
215        """
216        updates the model given the transitions obtained during the artificial episode.
217        """
218        for (s_tm1, action), r_ts in self.episode_reward_data.items():
219            # updated observations
220            scale_f = self.N[s_tm1, action].sum()
221            for r in r_ts:
222                # update the number of total iterations
223                self.iteration += 1
224
225                # update reward and variance estimate
226                scale_f += 1
227                old_estimated_reward = self.estimated_rewards[s_tm1, action]
228                self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0)
229                self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0)
230                self.variance_proxy_reward[s_tm1, action] += (
231                    r - old_estimated_reward
232                ) * (r - self.estimated_rewards[s_tm1, action])
233
234                # update holding time
235                self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0)
236                self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1)
237
238        for (s_tm1, action) in set(self.episode_transition_data.keys()):
239            self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum()

updates the model given the transitions obtained during the artificial episode.

def beta_r(self, nb_observations) -> numpy.ndarray:
241    def beta_r(self, nb_observations) -> np.ndarray:
242        """
243        calculates the confidence bounds on the reward.
244        Returns
245        -------
246        np.array
247            The vector of confidence bounds on the reward function (|S| x |A|)
248        """
249        S = self._n_states
250        A = self._n_actions
251        if self.bound_type_rew != "bernstein":
252            ci = _chernoff(
253                it=self.iteration,
254                N=nb_observations,
255                range=self.reward_range[1],
256                delta=self.delta,
257                sqrt_C=3.5,
258                log_C=2 * S * A,
259            )
260            return self.alpha_r * ci
261        else:
262            N = np.maximum(1, nb_observations)
263            Nm1 = np.maximum(1, nb_observations - 1)
264            var_r = self.variance_proxy_reward / Nm1
265            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
266            beta = bernstein(
267                scale_a=14 * var_r / N,
268                log_scale_a=log_value,
269                scale_b=49.0 * self.r_max / (3.0 * Nm1),
270                log_scale_b=log_value,
271                alpha_1=math.sqrt(self.alpha_r),
272                alpha_2=self.alpha_r,
273            )
274            return beta

calculates the confidence bounds on the reward.

Returns
  • np.array: The vector of confidence bounds on the reward function (|S| x |A|)
def beta_p(self, nb_observations) -> numpy.ndarray:
276    def beta_p(self, nb_observations) -> np.ndarray:
277        """
278        calculates the confidence bounds on the transition probabilities.
279        Returns
280        -------
281        np.array
282            The vector of confidence bounds on the reward function (|S| x |A|)
283        """
284        S = self._n_states
285        A = self._n_actions
286        if self.bound_type_p != "bernstein":
287            beta = _chernoff(
288                it=self.iteration,
289                N=nb_observations,
290                range=1.0,
291                delta=self.delta,
292                sqrt_C=14 * S,
293                log_C=2 * A,
294            )
295            return self.alpha_p * beta.reshape([S, A, 1])
296        else:
297            N = np.maximum(1, nb_observations)
298            Nm1 = np.maximum(1, nb_observations - 1)
299            var_p = self.P * (1.0 - self.P)
300            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
301            beta = bernstein(
302                scale_a=14 * var_p / N[:, :, np.newaxis],
303                log_scale_a=log_value,
304                scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]),
305                log_scale_b=log_value,
306                alpha_1=math.sqrt(self.alpha_p),
307                alpha_2=self.alpha_p,
308            )
309            return beta

calculates the confidence bounds on the transition probabilities.

Returns
  • np.array: The vector of confidence bounds on the reward function (|S| x |A|)
def solve_optimistic_model(self) -> Optional[float]:
311    def solve_optimistic_model(self) -> Union[None, float]:
312        """
313        solves the optimistic value iteration.
314        Returns
315        -------
316        Union[None, float]
317            The span value of the estimates from the optimistic value iteration or None if no solution has been found.
318        """
319        nb_observations = self.N.sum(-1)
320
321        beta_r = self.beta_r(nb_observations)  # confidence bounds on rewards
322        beta_p = self.beta_p(
323            nb_observations
324        )  # confidence bounds on transition probabilities
325
326        T = self.P
327        estimated_rewards = self.estimated_rewards
328
329        assert np.isclose(T.sum(-1), 1.0).all()
330        try:
331            res = extended_value_iteration(
332                T, estimated_rewards, beta_r, beta_p, self.reward_range[1]
333            )
334        except SystemError:
335            # Debug logs if the optimistic value iteration fails
336            os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True)
337            for i in range(100):
338                if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"):
339                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T)
340                    np.save(
341                        f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy",
342                        estimated_rewards,
343                    )
344                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r)
345                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p)
346                    break
347            res = None
348
349        if res is not None:
350            span_value_new, self.Q, self.V = res
351            span_value = span_value_new
352            self._actor.set_q_values(self.Q)
353
354            assert span_value >= 0, "The span value cannot be lower than zero"
355            assert np.abs(span_value - span_value_new) < 1e-8
356
357            return span_value
358        return None

solves the optimistic value iteration.

Returns
  • Union[None, float]: The span value of the estimates from the optimistic value iteration or None if no solution has been found.