colosseum.hardness.analysis

  1import os
  2from copy import deepcopy
  3from dataclasses import dataclass
  4from multiprocessing import Pool
  5from typing import (
  6    TYPE_CHECKING,
  7    Any,
  8    Dict,
  9    Iterable,
 10    Type,
 11    Tuple,
 12    Callable,
 13    Union,
 14    Collection,
 15)
 16
 17import pandas as pd
 18import seaborn as sns
 19from matplotlib import pyplot as plt
 20
 21from colosseum import config
 22from colosseum.agent.agents.episodic import PSRLEpisodic
 23from colosseum.agent.agents.infinite_horizon import UCRL2Continuous
 24from colosseum.mdp.base import BaseMDP
 25
 26sns.set_theme()
 27
 28if TYPE_CHECKING:
 29    from colosseum.agent.agents.base import BaseAgent
 30
 31
 32@dataclass()
 33class SingleInstanceHyperOptConfig:
 34    mdp_class: Type["BaseMDP"]
 35    """The class of the MDP with respect to whom we are performing the parameters optimization."""
 36    mdp_parameters: Dict[str, Any]
 37    """The dictionary containing the parameters for the MDP class with respect to whom we are performing the parameters optimization."""
 38    n_seed: int
 39    """The number of seed the agent/MDP interaction is repeated."""
 40    optimization_horizon: int
 41    """The optimization horizon of the agent/MDP interaction."""
 42    num_samples: int
 43    """The number of samples of the agent parameters."""
 44    max_interaction_s: float
 45    """The maximum amount of seconds allocated to training the agent."""
 46    log_every: int
 47    """The length of the interval between logging of the performance indicators."""
 48    episodic_near_optimal_agent_class: Type["BaseAgent"] = PSRLEpisodic
 49    """The class of the near optimal agent for the episodic setting. By default, it is ``PSRLEpisodic``."""
 50    continuous_near_optimal_agent_class: Type["BaseAgent"] = UCRL2Continuous
 51    """The class of the near optimal agent for the continuous setting. By default, it is ``UCRL2Continuous``."""
 52
 53
 54@dataclass()
 55class HardnessAnalysisParams:
 56    mdp_class: Type["BaseMDP"]
 57    """The class of the MDP whose hardness we are studying."""
 58    varying_params_name: str
 59    """The name of the parameter being varied."""
 60    varying_params_values: Iterable
 61    """The values of the parameter being varied."""
 62    fixed_params: Dict[str, Any]
 63    """The dictionary containing the names and values for the parameters being kept fixed."""
 64    n_seeds_mdp: int
 65    """The number of seeds used when instantiating the MDP."""
 66    hardness_measures: Collection[Union[str, Callable[[Type["BaseMDP"]], float]]] = (
 67        "diameter",
 68        "value_norm",
 69    )
 70    """An iterable containing either the code name of an available measure of hardness or a function that takes an MDP
 71    object as input and returns a value."""
 72    near_optimal_agent_hyperopt_config: SingleInstanceHyperOptConfig = None
 73    """The parameters optimization configuration for the near optimal agent. By default, it is None, which means
 74    that the regret of the near optimal agent with tuned parameters is not computed and used as proxy for a 
 75    complete measure of hardness."""
 76    varying_params_name_clean: str = None
 77    """The name of the parameter being varied in a clean format."""
 78    retrieve_from_cache: bool = True
 79    """If ture, the ``config.get_hardness_measures_cache_folder()`` is searched for a cached value of the measure.
 80    By default, it is True."""
 81
 82    @property
 83    def clean_varying_prm_name(self) -> str:
 84        """
 85        Returns
 86        -------
 87        str
 88            A nicely formatted name for the varying parameter.
 89        """
 90        if self.varying_params_name_clean is None:
 91            return self.varying_params_name
 92        return self.varying_params_name_clean
 93
 94
 95def run_scenario_analysis(
 96    hap: HardnessAnalysisParams,
 97    ax=None,
 98):
 99    """
100    runs a hardness analysis scenario.
101
102    Parameters
103    ----------
104    hap : HardnessAnalysisParams
105        The hardness analysis scenario to run.
106    ax : plt.Axes
107        The ax object where the plot will be put. By default, a new axis is created.
108    """
109
110    show = ax is None
111    if ax is None:
112        fig, ax = plt.subplots()
113
114    dfs = get_varying_parameter_dfs(hap, normalize_measures=True)
115
116    for measure_name, df in dfs.items():
117        sns.lineplot(
118            x=hap.clean_varying_prm_name,
119            y=measure_name,
120            data=df,
121            ax=ax,
122            label=measure_name,
123        )
124    plt.ylabel("Hardness measure value")
125    plt.legend()
126
127    if show:
128        plt.show()
129
130
131def get_varying_parameter_dfs(
132    hap: HardnessAnalysisParams,
133    normalize_measures: bool = False,
134) -> Dict[str, pd.DataFrame]:
135    """
136    computes the hardness measures for the scenarios and stored them in a `pd.DataFrame`s.
137
138    Parameters
139    ----------
140    hap : HardnessAnalysisParams
141        The hardness analysis scenario to run.
142    normalize_measures : bool
143        If True, the values of the hardness measures are normalized.
144
145    Returns
146    -------
147    Dict[str, pd.DataFrame]
148        A dictionary that associated the name of a hardness measure to its corresponding pd.DataFrame.
149    """
150
151    if hap.near_optimal_agent_hyperopt_config is not None:
152        raise NotImplementedError(
153            "The computation of the regret of the near optimal agent as a proxy for a complete measure of hardness is "
154            "being refactored at the moment."
155        )
156
157    measure_results = dict()
158    if config.get_available_cores() > 1:
159        inputs = [
160            (
161                hap.mdp_class,
162                hap.fixed_params,
163                hap.varying_params_name,
164                varying_value,
165                seed,
166                measure,
167                hap.retrieve_from_cache,
168            )
169            for varying_value in hap.varying_params_values
170            for measure in hap.hardness_measures
171            for seed in range(hap.n_seeds_mdp)
172        ]
173
174        with Pool(processes=config.get_available_cores()) as p:
175            for measure_name, varying_value, seed, measure_value in p.starmap_async(
176                _compute_hardness_measure, inputs
177            ).get():
178                _add_result(
179                    measure_results, measure_name, varying_value, seed, measure_value
180                )
181    else:
182        for seed in range(hap.n_seeds_mdp):
183            for measure in hap.hardness_measures:
184                for varying_value in hap.varying_params_values:
185                    out = compute_hardness_measure_for_varying_prm(
186                        hap.mdp_class,
187                        hap.fixed_params,
188                        hap.varying_params_name,
189                        varying_value,
190                        seed,
191                        measure,
192                        force_single_core=True,
193                        retrieve_from_cache=hap.retrieve_from_cache,
194                        return_n_states=hap.varying_params_name == "size",
195                    )
196                    if hap.varying_params_name == "size":
197                        measure_name, measure_value, n_states = out
198                    else:
199                        measure_name, measure_value = out
200
201                    _add_result(
202                        measure_results,
203                        measure_name,
204                        varying_value
205                        if hap.varying_params_name != "size"
206                        else n_states,
207                        seed,
208                        measure_value,
209                    )
210
211    for measure_name in measure_results:
212        # Create a Pandas DataFrame
213        df = pd.DataFrame.from_dict(measure_results[measure_name])
214
215        # Normalize the values if required
216        if normalize_measures:
217            min_value = df.loc[:, measure_name].min()
218            max_value = df.loc[:, measure_name].max()
219            if max_value > min_value + 1e-4:
220                df.loc[:, measure_name] = (df.loc[:, measure_name] - min_value) / (
221                    max_value - min_value
222                )
223            else:
224                df.loc[:, measure_name] = 0.5
225
226        # Clean the varying parameter name
227        df = df.rename(columns={"Varying value": hap.clean_varying_prm_name}).set_index(
228            hap.clean_varying_prm_name
229        )
230
231        measure_results[measure_name] = df
232
233    return measure_results
234
235
236def _compute_hardness_measure(
237    mdp_class,
238    fixed_params,
239    varying_params_name,
240    varying_value,
241    seed,
242    measure,
243    retrieve_from_cache,
244    return_n_states: bool = False,
245):
246    measure_name, measure_value = compute_hardness_measure_for_varying_prm(
247        mdp_class,
248        fixed_params,
249        varying_params_name,
250        varying_value,
251        seed,
252        measure,
253        force_single_core=True,
254        retrieve_from_cache=retrieve_from_cache,
255    )
256    return measure_name, varying_value, seed, measure_value
257
258
259def compute_hardness_measure_for_varying_prm(
260    mdp_class: Type["BaseMDP"],
261    fixed_params: Dict[str, Any],
262    varying_params_name: str,
263    varying_value: Any,
264    seed: int,
265    measure: Union[str, Callable[[BaseMDP], float]],
266    force_single_core: bool = False,
267    retrieve_from_cache: bool = True,
268    folder: str = None,
269    return_n_states: bool = False,
270) -> Tuple[str, float, int]:
271    """
272    computes the hardness measure for varying values of the parameter.
273
274    Parameters
275    ----------
276    mdp_class : Type["BaseMDP"]
277        The MDP class for which the measures will be computed.
278    fixed_params : Dict[str, Any]
279        The parameters of the MDP that are being kept fixed.
280    varying_params_name : str
281        The name of the varying parameter.
282    varying_value : Any
283        The value of the parameter that is varying.
284    seed : int
285        The random seed.
286    measure : Union[str, Callable[[BaseMDP], float]]
287        The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given
288        as a string, it will be looked for in the ones available in the package
289    force_single_core : bool
290        If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when
291        the measure is given as a function. By default, single processing is not enforced.
292    retrieve_from_cache : bool
293        If True, the function will look for cached values of the measure. Note that this also hold if the measure is
294        given as a function.
295    folder : str
296        The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`.
297    return_n_states : bool
298        If True, the number of states is returned.
299
300    Returns
301    -------
302    str
303        The nicely formatted name of the measure.
304    float
305        The value of the measure.
306    int, optional
307        The number of states.
308    """
309
310    # Instantiate the MDP parameters
311    mdp_kwargs = deepcopy(fixed_params)
312    mdp_kwargs["seed"] = seed
313    mdp_kwargs[varying_params_name] = varying_value
314
315    return compute_hardness_measure(
316        mdp_class,
317        mdp_kwargs,
318        measure,
319        force_single_core,
320        retrieve_from_cache,
321        folder,
322        True,
323        return_n_states,
324    )
325
326
327def compute_hardness_measure(
328    mdp_class: Type["BaseMDP"],
329    mdp_params: Dict[str, Any],
330    measure: Union[str, Callable[[BaseMDP], float]],
331    force_single_core: bool = False,
332    retrieve_from_cache: bool = True,
333    folder: str = None,
334    return_measure_name: bool = False,
335    return_n_states: bool = False,
336) -> Union[float, Tuple[str, float], Tuple[float, int], Tuple[str, float, int]]:
337    """
338
339    Parameters
340    ----------
341    mdp_class : Type["BaseMDP"]
342        The MDP class for which the measures will be computed.
343    mdp_params : Dict[str, Any]
344        The parameters for the MDP.
345    measure : Union[str, Callable[[BaseMDP], float]]
346        The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given
347        as a string, it will be looked for in the ones available in the package
348    force_single_core : bool
349        If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when
350        the measure is given as a function. By default, single processing is not enforced.
351    retrieve_from_cache : bool
352        If True, the function will look for cached values of the measure. Note that this also hold if the measure is
353        given as a function.
354    folder : str
355        The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`.
356    return_measure_name : bool
357        If True, a nicely formatted name for the measure is returned.
358    return_n_states : bool
359        If True, the number of states is returned.
360
361    Returns
362    -------
363    str, optional
364        The nicely formatted name of the measure.
365    float
366        The value of the measure.
367    int, optional
368        The number of states.
369    """
370
371    # Obtain the name of the measure and the function to compute it
372    measure_name, measure_f = _process_measure(measure)
373
374    # Check if the measure has already been computed
375    if retrieve_from_cache:
376        mdp_shell = mdp_class(
377            **mdp_params, instantiate_mdp=False, exclude_horizon_from_parameters=True
378        )
379        if folder is None:
380            folder = (
381                config.get_hardness_measures_cache_folder()
382                + mdp_class.__name__
383                + os.sep
384            )
385
386        measure_file_path = f"{folder}{measure_name}_{mdp_shell.hash}.txt"
387        if os.path.isfile(measure_file_path):
388            with open(measure_file_path, "r") as f:
389                measure_value = float(f.read())
390
391            out = [measure_value]
392            if return_measure_name:
393                out.insert(0, measure_name)
394            if return_n_states:
395                mdp_shell.instantiate_MDP()
396                out.append(mdp_shell.n_states)
397            return out
398
399    # Possible forcing the computation to avoid using multiple cores
400    if force_single_core and config.get_available_cores() > 1:
401        available_cores = config.get_available_cores()
402        config.disable_multiprocessing()
403        mdp = mdp_class(**mdp_params)
404        measure_value = measure_f(mdp)
405        config.set_available_cores(available_cores)
406    else:
407        mdp = mdp_class(**mdp_params)
408        measure_value = measure_f(mdp)
409
410    # Caching the value of the measure (only in case we were supposed to look for it in the first place)
411    if retrieve_from_cache:
412        os.makedirs(os.path.dirname(measure_file_path), exist_ok=True)
413        with open(measure_file_path, "w") as f:
414            f.write(str(measure_value))
415
416    out = [measure_value]
417    if return_measure_name:
418        out.insert(0, measure_name)
419    if return_n_states:
420        out.append(mdp.n_states)
421    return out
422
423
424def _process_measure(
425    measure: Union[str, Callable[[BaseMDP], float]]
426) -> Tuple[str, Callable[[BaseMDP], float]]:
427    if type(measure) == str:
428        measure_name = measure
429        if measure_name not in BaseMDP.get_available_hardness_measures():
430            raise ValueError(
431                f"{measure} is not a valid hardness measure, the available ones are: "
432                f"{BaseMDP.get_available_hardness_measures()}"
433            )
434        return measure, lambda mdp: mdp.get_measure_from_name(measure)
435    elif callable(measure):
436        return measure.__name__, measure
437    else:
438        raise ValueError(
439            f"The measure should either be a string or a Callable, {type(measure)} received."
440        )
441
442
443def _add_result(measure_results, measure_name, varying_value, seed, measure_value):
444    measure_name = measure_name.capitalize().replace("_", " ")
445
446    if measure_name not in measure_results:
447        measure_results[measure_name] = {
448            "Varying value": [],
449            measure_name: [],
450            "Seed": [],
451        }
452    measure_results[measure_name]["Varying value"].append(varying_value)
453    measure_results[measure_name]["Seed"].append(seed)
454    measure_results[measure_name][measure_name].append(measure_value)
@dataclass()
class SingleInstanceHyperOptConfig:
33@dataclass()
34class SingleInstanceHyperOptConfig:
35    mdp_class: Type["BaseMDP"]
36    """The class of the MDP with respect to whom we are performing the parameters optimization."""
37    mdp_parameters: Dict[str, Any]
38    """The dictionary containing the parameters for the MDP class with respect to whom we are performing the parameters optimization."""
39    n_seed: int
40    """The number of seed the agent/MDP interaction is repeated."""
41    optimization_horizon: int
42    """The optimization horizon of the agent/MDP interaction."""
43    num_samples: int
44    """The number of samples of the agent parameters."""
45    max_interaction_s: float
46    """The maximum amount of seconds allocated to training the agent."""
47    log_every: int
48    """The length of the interval between logging of the performance indicators."""
49    episodic_near_optimal_agent_class: Type["BaseAgent"] = PSRLEpisodic
50    """The class of the near optimal agent for the episodic setting. By default, it is ``PSRLEpisodic``."""
51    continuous_near_optimal_agent_class: Type["BaseAgent"] = UCRL2Continuous
52    """The class of the near optimal agent for the continuous setting. By default, it is ``UCRL2Continuous``."""
SingleInstanceHyperOptConfig( mdp_class: Type[colosseum.mdp.base.BaseMDP], mdp_parameters: Dict[str, Any], n_seed: int, optimization_horizon: int, num_samples: int, max_interaction_s: float, log_every: int, episodic_near_optimal_agent_class: Type[colosseum.agent.agents.base.BaseAgent] = <class 'colosseum.agent.agents.episodic.posterior_sampling.PSRLEpisodic'>, continuous_near_optimal_agent_class: Type[colosseum.agent.agents.base.BaseAgent] = <class 'colosseum.agent.agents.infinite_horizon.ucrl2.UCRL2Continuous'>)
mdp_class: Type[colosseum.mdp.base.BaseMDP]

The class of the MDP with respect to whom we are performing the parameters optimization.

mdp_parameters: Dict[str, Any]

The dictionary containing the parameters for the MDP class with respect to whom we are performing the parameters optimization.

n_seed: int

The number of seed the agent/MDP interaction is repeated.

optimization_horizon: int

The optimization horizon of the agent/MDP interaction.

num_samples: int

The number of samples of the agent parameters.

max_interaction_s: float

The maximum amount of seconds allocated to training the agent.

log_every: int

The length of the interval between logging of the performance indicators.

@gin.configurable
class SingleInstanceHyperOptConfig.episodic_near_optimal_agent_class(colosseum.agent.agents.base.BaseAgent):
 20@gin.configurable
 21class PSRLEpisodic(BaseAgent):
 22    """
 23    The posterior sampling for reinforcement learning algorithm.
 24
 25    Osband, Ian, Daniel Russo, and Benjamin Van Roy. "(More) efficient reinforcement learning via posterior sampling."
 26    Advances in Neural Information Processing Systems 26 (2013).
 27    """
 28
 29    def step_update(
 30        self,
 31        ts_t: dm_env.TimeStep,
 32        a_t: "ACTION_TYPE",
 33        ts_tp1: dm_env.TimeStep,
 34        time: int,
 35    ):
 36        super(PSRLEpisodic, self).step_update(ts_t, a_t, ts_tp1, time)
 37
 38    @staticmethod
 39    def is_emission_map_accepted(emission_map: "EmissionMap") -> bool:
 40        return emission_map.is_tabular
 41
 42    @staticmethod
 43    def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0):
 44        return (
 45            "from colosseum.agent.mdp_models import bayesian_models\n"
 46            f"prms_{index}/PSRLEpisodic.reward_prior_model = %bayesian_models.RewardsConjugateModel.N_NIG\n"
 47            f"prms_{index}/PSRLEpisodic.transitions_prior_model = %bayesian_models.TransitionsConjugateModel.M_DIR\n"
 48            f"prms_{index}/PSRLEpisodic.rewards_prior_prms = [{parameters['prior_mean']}, 1, 1, 1]\n"
 49            f"prms_{index}/PSRLEpisodic.transitions_prior_prms = [{parameters['transition_prior']}]"
 50        )
 51
 52    @staticmethod
 53    def is_episodic() -> bool:
 54        return True
 55
 56    @staticmethod
 57    def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]:
 58        return dict(
 59            prior_mean=tune.uniform(0.001, 2.0), transition_prior=tune.uniform(0.001, 2)
 60        )
 61
 62    @staticmethod
 63    def get_agent_instance_from_parameters(
 64        seed: int,
 65        optimization_horizon: int,
 66        mdp_specs: MDPSpec,
 67        parameters: Dict[str, Any],
 68    ) -> "BaseAgent":
 69        return PSRLEpisodic(
 70            mdp_specs=mdp_specs,
 71            seed=seed,
 72            optimization_horizon=optimization_horizon,
 73            reward_prior_model=RewardsConjugateModel.N_NIG,
 74            transitions_prior_model=TransitionsConjugateModel.M_DIR,
 75            rewards_prior_prms=[parameters["prior_mean"], 1, 1, 1],
 76            transitions_prior_prms=[parameters["transition_prior"]],
 77        )
 78
 79    @property
 80    def current_optimal_stochastic_policy(self) -> np.ndarray:
 81        T_map, R_map = self._mdp_model.get_map_estimate()
 82        Q, _ = episodic_value_iteration(self._time_horizon, T_map, R_map)
 83        return get_policy_from_q_values(Q, True)
 84
 85    def __init__(
 86        self,
 87        seed: int,
 88        mdp_specs: MDPSpec,
 89        optimization_horizon: int,
 90        # MDP model parameters
 91        reward_prior_model: RewardsConjugateModel = None,
 92        transitions_prior_model: TransitionsConjugateModel = None,
 93        rewards_prior_prms=None,
 94        transitions_prior_prms=None,
 95        # Actor parameters
 96        epsilon_greedy: Union[float, Callable] = None,
 97        boltzmann_temperature: Union[float, Callable] = None,
 98    ):
 99        """
100
101        Parameters
102        ----------
103        seed : int
104            The random seed.
105        mdp_specs : MDPSpec
106            The full specification of the MDP.
107        optimization_horizon : int
108            The total number of interactions that the agent is expected to have with the MDP.
109        reward_prior_model : RewardsConjugateModel, optional
110            The reward priors.
111        transitions_prior_model : TransitionsConjugateModel, optional
112            The transitions priors.
113        rewards_prior_prms : Any
114            The reward prior parameters.
115        transitions_prior_prms : Any
116            The transitions prior parameters.
117        epsilon_greedy : Union[float, Callable], optional
118            The probability of selecting an action at random. It can be provided as a float or as a function of the
119            total number of interactions. By default, the probability is set to zero.
120        boltzmann_temperature : Union[float, Callable], optional
121            The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of
122            the total number of interactions. By default, Boltzmann exploration is disabled.
123        """
124
125        mdp_model = BayesianMDPModel(
126            seed,
127            mdp_specs,
128            reward_prior_model=reward_prior_model,
129            transitions_prior_model=transitions_prior_model,
130            rewards_prior_prms=rewards_prior_prms,
131            transitions_prior_prms=transitions_prior_prms,
132        )
133        actor = QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature)
134
135        super(PSRLEpisodic, self).__init__(
136            seed,
137            mdp_specs,
138            mdp_model,
139            actor,
140            optimization_horizon,
141        )
142
143    def episode_end_update(self):
144        Q, _ = episodic_value_iteration(self._time_horizon, *self._mdp_model.sample())
145        self._actor.set_q_values(Q)
146
147    def before_start_interacting(self):
148        self.episode_end_update()

The class of the near optimal agent for the episodic setting. By default, it is PSRLEpisodic.

@gin.configurable
class SingleInstanceHyperOptConfig.continuous_near_optimal_agent_class(colosseum.agent.agents.base.BaseAgent):
 34@gin.configurable
 35class UCRL2Continuous(BaseAgent):
 36    """
 37    The second version of upper confidence for reinforcement learning algorithm.
 38
 39    Auer, Peter, Thomas Jaksch, and Ronald Ortner. "Near-optimal regret bounds for reinforcement learning." Advances in
 40    neural information processing systems 21 (2008).
 41
 42    Fruit, Ronan, Matteo Pirotta, and Alessandro Lazaric. "Improved analysis of ucrl2 with empirical bernstein inequality."
 43    arXiv preprint arXiv:2007.05456 (2020).
 44    """
 45
 46    @staticmethod
 47    def is_emission_map_accepted(emission_map: "EmissionMap") -> bool:
 48        return emission_map.is_tabular
 49
 50    @staticmethod
 51    def produce_gin_file_from_parameters(parameters: Dict[str, Any], index: int = 0):
 52        string = f"prms_{index}/UCRL2Continuous.bound_type_p='bernstein'\n"
 53        for k, v in parameters.items():
 54            string += f"prms_{index}/UCRL2Continuous.{k} = {v}\n"
 55        return string[:-1]
 56
 57    @staticmethod
 58    def is_episodic() -> bool:
 59        return False
 60
 61    @staticmethod
 62    def get_hyperparameters_search_spaces() -> Dict[str, tune.sample.Domain]:
 63        return {"alpha_p": tune.uniform(0.1, 3), "alpha_r": tune.uniform(0.1, 3)}
 64
 65    @staticmethod
 66    def get_agent_instance_from_parameters(
 67        seed: int,
 68        optimization_horizon: int,
 69        mdp_specs: "MDPSpec",
 70        parameters: Dict[str, Any],
 71    ) -> "BaseAgent":
 72        return UCRL2Continuous(
 73            mdp_specs=mdp_specs,
 74            seed=seed,
 75            optimization_horizon=optimization_horizon,
 76            alpha_p=parameters["alpha_p"],
 77            alpha_r=parameters["alpha_r"],
 78            bound_type_p="bernstein",
 79        )
 80
 81    @property
 82    def current_optimal_stochastic_policy(self) -> np.ndarray:
 83        Q, _ = discounted_value_iteration(self.P, self.estimated_rewards)
 84        return get_policy_from_q_values(Q, True)
 85
 86    def __init__(
 87        self,
 88        seed: int,
 89        mdp_specs: "MDPSpec",
 90        optimization_horizon: int,
 91        # MDP model parameters
 92        alpha_r=1.0,
 93        alpha_p=1.0,
 94        bound_type_p="_chernoff",
 95        bound_type_rew="_chernoff",
 96        # Actor parameters
 97        epsilon_greedy: Union[float, Callable] = None,
 98        boltzmann_temperature: Union[float, Callable] = None,
 99    ):
100        r"""
101        Parameters
102        ----------
103        seed : int
104            The random seed.
105        mdp_specs : MDPSpec
106            The full specification of the MDP.
107        optimization_horizon : int
108            The total number of interactions that the agent is expected to have with the MDP.
109        alpha_r : float
110            The :math:`\alpha` parameter for the rewards. By default, it is set to one.
111        alpha_p : float
112            The :math:`\alpha` parameter for the transitions. By default, it is set to one.
113        bound_type_p : str
114            The upper confidence bound type for the transitions. It can either be '_chernoff' or 'bernstein'. By default,
115            it is set to '_chernoff'.
116        bound_type_rew : str
117            The upper confidence bound type for the rewards. It can either be '_chernoff' or 'bernstein'. By default,
118            it is set to '_chernoff'.
119        epsilon_greedy : Union[float, Callable], optional
120            The probability of selecting an action at random. It can be provided as a float or as a function of the
121            total number of interactions. By default, the probability is set to zero.
122        boltzmann_temperature : Union[float, Callable], optional
123            The parameter that controls the Boltzmann exploration. It can be provided as a float or as a function of
124            the total number of interactions. By default, Boltzmann exploration is disabled.
125        """
126
127        n_states = self._n_states = mdp_specs.observations.num_values
128        n_actions = self._n_actions = mdp_specs.actions.num_values
129        self.reward_range = mdp_specs.rewards_range
130
131        assert bound_type_p in ["_chernoff", "bernstein"]
132        assert bound_type_rew in ["_chernoff", "bernstein"]
133
134        self.alpha_p = alpha_p
135        self.alpha_r = alpha_r
136
137        # initialize matrices
138        self.policy = np.zeros((n_states,), dtype=np.int_)
139        self.policy_indices = np.zeros((n_states,), dtype=np.int_)
140
141        # initialization
142        self.iteration = 0
143        self.episode = 0
144        self.delta = 1.0  # confidence
145        self.bound_type_p = bound_type_p
146        self.bound_type_rew = bound_type_rew
147
148        self.P = np.ones((n_states, n_actions, n_states), np.float32) / n_states
149
150        self.estimated_rewards = (
151            np.ones((n_states, n_actions), np.float32) * mdp_specs.rewards_range[1]
152        )
153        self.variance_proxy_reward = np.zeros((n_states, n_actions), np.float32)
154        self.estimated_holding_times = np.ones((n_states, n_actions), np.float32)
155        self.N = np.zeros((n_states, n_actions, n_states), dtype=np.int32)
156
157        self.current_state = None
158        self.artificial_episode = 0
159        self.episode_reward_data = dict()
160        self.episode_transition_data = dict()
161
162        super(UCRL2Continuous, self).__init__(
163            seed,
164            mdp_specs,
165            None,
166            QValuesActor(seed, mdp_specs, epsilon_greedy, boltzmann_temperature),
167            optimization_horizon,
168        )
169
170    def is_episode_end(
171        self,
172        ts_t: dm_env.TimeStep,
173        a_t: "ACTION_TYPE",
174        ts_tp1: dm_env.TimeStep,
175        time: int,
176    ) -> bool:
177        nu_k = len(self.episode_transition_data[ts_t.observation, a_t])
178        return nu_k >= max(1, self.N[ts_t.observation, a_t].sum() - nu_k)
179
180    def episode_end_update(self):
181        self.episode += 1
182        self.delta = 1 / math.sqrt(self.iteration + 1)
183
184        new_sp = self.solve_optimistic_model()
185        if new_sp is not None:
186            self.span_value = new_sp / self.reward_range[1]
187
188        if len(self.episode_transition_data) > 0:
189            self.model_update()
190            self.episode_reward_data = dict()
191            self.episode_transition_data = dict()
192
193    def before_start_interacting(self):
194        self.episode_end_update()
195
196    def step_update(
197        self, ts_t: dm_env.TimeStep, a_t: "ACTION_TYPE", ts_tp1: dm_env.TimeStep, h: int
198    ):
199        self.N[ts_t.observation, a_t, ts_tp1.observation] += 1
200
201        if (ts_t.observation, a_t) in self.episode_reward_data:
202            self.episode_reward_data[ts_t.observation, a_t].append(ts_tp1.reward)
203            if not ts_tp1.last():
204                self.episode_transition_data[ts_t.observation, a_t].append(
205                    ts_tp1.observation
206                )
207        else:
208            self.episode_reward_data[ts_t.observation, a_t] = [ts_tp1.reward]
209            if not ts_tp1.last():
210                self.episode_transition_data[ts_t.observation, a_t] = [
211                    ts_tp1.observation
212                ]
213
214    def model_update(self):
215        """
216        updates the model given the transitions obtained during the artificial episode.
217        """
218        for (s_tm1, action), r_ts in self.episode_reward_data.items():
219            # updated observations
220            scale_f = self.N[s_tm1, action].sum()
221            for r in r_ts:
222                # update the number of total iterations
223                self.iteration += 1
224
225                # update reward and variance estimate
226                scale_f += 1
227                old_estimated_reward = self.estimated_rewards[s_tm1, action]
228                self.estimated_rewards[s_tm1, action] *= scale_f / (scale_f + 1.0)
229                self.estimated_rewards[s_tm1, action] += r / (scale_f + 1.0)
230                self.variance_proxy_reward[s_tm1, action] += (
231                    r - old_estimated_reward
232                ) * (r - self.estimated_rewards[s_tm1, action])
233
234                # update holding time
235                self.estimated_holding_times[s_tm1, action] *= scale_f / (scale_f + 1.0)
236                self.estimated_holding_times[s_tm1, action] += 1 / (scale_f + 1)
237
238        for (s_tm1, action) in set(self.episode_transition_data.keys()):
239            self.P[s_tm1, action] = self.N[s_tm1, action] / self.N[s_tm1, action].sum()
240
241    def beta_r(self, nb_observations) -> np.ndarray:
242        """
243        calculates the confidence bounds on the reward.
244        Returns
245        -------
246        np.array
247            The vector of confidence bounds on the reward function (|S| x |A|)
248        """
249        S = self._n_states
250        A = self._n_actions
251        if self.bound_type_rew != "bernstein":
252            ci = _chernoff(
253                it=self.iteration,
254                N=nb_observations,
255                range=self.reward_range[1],
256                delta=self.delta,
257                sqrt_C=3.5,
258                log_C=2 * S * A,
259            )
260            return self.alpha_r * ci
261        else:
262            N = np.maximum(1, nb_observations)
263            Nm1 = np.maximum(1, nb_observations - 1)
264            var_r = self.variance_proxy_reward / Nm1
265            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
266            beta = bernstein(
267                scale_a=14 * var_r / N,
268                log_scale_a=log_value,
269                scale_b=49.0 * self.r_max / (3.0 * Nm1),
270                log_scale_b=log_value,
271                alpha_1=math.sqrt(self.alpha_r),
272                alpha_2=self.alpha_r,
273            )
274            return beta
275
276    def beta_p(self, nb_observations) -> np.ndarray:
277        """
278        calculates the confidence bounds on the transition probabilities.
279        Returns
280        -------
281        np.array
282            The vector of confidence bounds on the reward function (|S| x |A|)
283        """
284        S = self._n_states
285        A = self._n_actions
286        if self.bound_type_p != "bernstein":
287            beta = _chernoff(
288                it=self.iteration,
289                N=nb_observations,
290                range=1.0,
291                delta=self.delta,
292                sqrt_C=14 * S,
293                log_C=2 * A,
294            )
295            return self.alpha_p * beta.reshape([S, A, 1])
296        else:
297            N = np.maximum(1, nb_observations)
298            Nm1 = np.maximum(1, nb_observations - 1)
299            var_p = self.P * (1.0 - self.P)
300            log_value = 2.0 * S * A * (self.iteration + 1) / self.delta
301            beta = bernstein(
302                scale_a=14 * var_p / N[:, :, np.newaxis],
303                log_scale_a=log_value,
304                scale_b=49.0 / (3.0 * Nm1[:, :, np.newaxis]),
305                log_scale_b=log_value,
306                alpha_1=math.sqrt(self.alpha_p),
307                alpha_2=self.alpha_p,
308            )
309            return beta
310
311    def solve_optimistic_model(self) -> Union[None, float]:
312        """
313        solves the optimistic value iteration.
314        Returns
315        -------
316        Union[None, float]
317            The span value of the estimates from the optimistic value iteration or None if no solution has been found.
318        """
319        nb_observations = self.N.sum(-1)
320
321        beta_r = self.beta_r(nb_observations)  # confidence bounds on rewards
322        beta_p = self.beta_p(
323            nb_observations
324        )  # confidence bounds on transition probabilities
325
326        T = self.P
327        estimated_rewards = self.estimated_rewards
328
329        assert np.isclose(T.sum(-1), 1.0).all()
330        try:
331            res = extended_value_iteration(
332                T, estimated_rewards, beta_r, beta_p, self.reward_range[1]
333            )
334        except SystemError:
335            # Debug logs if the optimistic value iteration fails
336            os.makedirs(f"tmp{os.sep}error_ext_vi", exist_ok=True)
337            for i in range(100):
338                if not os.path.isfile(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy"):
339                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}T{i}.npy", T)
340                    np.save(
341                        f"tmp{os.sep}error_ext_vi{os.sep}estimated_rewards{i}.npy",
342                        estimated_rewards,
343                    )
344                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_r.npy{i}", beta_r)
345                    np.save(f"tmp{os.sep}error_ext_vi{os.sep}beta_p.npy{i}", beta_p)
346                    break
347            res = None
348
349        if res is not None:
350            span_value_new, self.Q, self.V = res
351            span_value = span_value_new
352            self._actor.set_q_values(self.Q)
353
354            assert span_value >= 0, "The span value cannot be lower than zero"
355            assert np.abs(span_value - span_value_new) < 1e-8
356
357            return span_value
358        return None

The class of the near optimal agent for the continuous setting. By default, it is UCRL2Continuous.

@dataclass()
class HardnessAnalysisParams:
55@dataclass()
56class HardnessAnalysisParams:
57    mdp_class: Type["BaseMDP"]
58    """The class of the MDP whose hardness we are studying."""
59    varying_params_name: str
60    """The name of the parameter being varied."""
61    varying_params_values: Iterable
62    """The values of the parameter being varied."""
63    fixed_params: Dict[str, Any]
64    """The dictionary containing the names and values for the parameters being kept fixed."""
65    n_seeds_mdp: int
66    """The number of seeds used when instantiating the MDP."""
67    hardness_measures: Collection[Union[str, Callable[[Type["BaseMDP"]], float]]] = (
68        "diameter",
69        "value_norm",
70    )
71    """An iterable containing either the code name of an available measure of hardness or a function that takes an MDP
72    object as input and returns a value."""
73    near_optimal_agent_hyperopt_config: SingleInstanceHyperOptConfig = None
74    """The parameters optimization configuration for the near optimal agent. By default, it is None, which means
75    that the regret of the near optimal agent with tuned parameters is not computed and used as proxy for a 
76    complete measure of hardness."""
77    varying_params_name_clean: str = None
78    """The name of the parameter being varied in a clean format."""
79    retrieve_from_cache: bool = True
80    """If ture, the ``config.get_hardness_measures_cache_folder()`` is searched for a cached value of the measure.
81    By default, it is True."""
82
83    @property
84    def clean_varying_prm_name(self) -> str:
85        """
86        Returns
87        -------
88        str
89            A nicely formatted name for the varying parameter.
90        """
91        if self.varying_params_name_clean is None:
92            return self.varying_params_name
93        return self.varying_params_name_clean
HardnessAnalysisParams( mdp_class: Type[colosseum.mdp.base.BaseMDP], varying_params_name: str, varying_params_values: Iterable, fixed_params: Dict[str, Any], n_seeds_mdp: int, hardness_measures: Collection[Union[str, Callable[[Type[colosseum.mdp.base.BaseMDP]], float]]] = ('diameter', 'value_norm'), near_optimal_agent_hyperopt_config: colosseum.hardness.analysis.SingleInstanceHyperOptConfig = None, varying_params_name_clean: str = None, retrieve_from_cache: bool = True)
mdp_class: Type[colosseum.mdp.base.BaseMDP]

The class of the MDP whose hardness we are studying.

varying_params_name: str

The name of the parameter being varied.

varying_params_values: Iterable

The values of the parameter being varied.

fixed_params: Dict[str, Any]

The dictionary containing the names and values for the parameters being kept fixed.

n_seeds_mdp: int

The number of seeds used when instantiating the MDP.

hardness_measures: Collection[Union[str, Callable[[Type[colosseum.mdp.base.BaseMDP]], float]]] = ('diameter', 'value_norm')

An iterable containing either the code name of an available measure of hardness or a function that takes an MDP object as input and returns a value.

near_optimal_agent_hyperopt_config: colosseum.hardness.analysis.SingleInstanceHyperOptConfig = None

The parameters optimization configuration for the near optimal agent. By default, it is None, which means that the regret of the near optimal agent with tuned parameters is not computed and used as proxy for a complete measure of hardness.

varying_params_name_clean: str = None

The name of the parameter being varied in a clean format.

retrieve_from_cache: bool = True

If ture, the config.get_hardness_measures_cache_folder() is searched for a cached value of the measure. By default, it is True.

clean_varying_prm_name: str
Returns
  • str: A nicely formatted name for the varying parameter.
def run_scenario_analysis(hap: colosseum.hardness.analysis.HardnessAnalysisParams, ax=None):
 96def run_scenario_analysis(
 97    hap: HardnessAnalysisParams,
 98    ax=None,
 99):
100    """
101    runs a hardness analysis scenario.
102
103    Parameters
104    ----------
105    hap : HardnessAnalysisParams
106        The hardness analysis scenario to run.
107    ax : plt.Axes
108        The ax object where the plot will be put. By default, a new axis is created.
109    """
110
111    show = ax is None
112    if ax is None:
113        fig, ax = plt.subplots()
114
115    dfs = get_varying_parameter_dfs(hap, normalize_measures=True)
116
117    for measure_name, df in dfs.items():
118        sns.lineplot(
119            x=hap.clean_varying_prm_name,
120            y=measure_name,
121            data=df,
122            ax=ax,
123            label=measure_name,
124        )
125    plt.ylabel("Hardness measure value")
126    plt.legend()
127
128    if show:
129        plt.show()

runs a hardness analysis scenario.

Parameters
  • hap (HardnessAnalysisParams): The hardness analysis scenario to run.
  • ax (plt.Axes): The ax object where the plot will be put. By default, a new axis is created.
def get_varying_parameter_dfs( hap: colosseum.hardness.analysis.HardnessAnalysisParams, normalize_measures: bool = False) -> Dict[str, pandas.core.frame.DataFrame]:
132def get_varying_parameter_dfs(
133    hap: HardnessAnalysisParams,
134    normalize_measures: bool = False,
135) -> Dict[str, pd.DataFrame]:
136    """
137    computes the hardness measures for the scenarios and stored them in a `pd.DataFrame`s.
138
139    Parameters
140    ----------
141    hap : HardnessAnalysisParams
142        The hardness analysis scenario to run.
143    normalize_measures : bool
144        If True, the values of the hardness measures are normalized.
145
146    Returns
147    -------
148    Dict[str, pd.DataFrame]
149        A dictionary that associated the name of a hardness measure to its corresponding pd.DataFrame.
150    """
151
152    if hap.near_optimal_agent_hyperopt_config is not None:
153        raise NotImplementedError(
154            "The computation of the regret of the near optimal agent as a proxy for a complete measure of hardness is "
155            "being refactored at the moment."
156        )
157
158    measure_results = dict()
159    if config.get_available_cores() > 1:
160        inputs = [
161            (
162                hap.mdp_class,
163                hap.fixed_params,
164                hap.varying_params_name,
165                varying_value,
166                seed,
167                measure,
168                hap.retrieve_from_cache,
169            )
170            for varying_value in hap.varying_params_values
171            for measure in hap.hardness_measures
172            for seed in range(hap.n_seeds_mdp)
173        ]
174
175        with Pool(processes=config.get_available_cores()) as p:
176            for measure_name, varying_value, seed, measure_value in p.starmap_async(
177                _compute_hardness_measure, inputs
178            ).get():
179                _add_result(
180                    measure_results, measure_name, varying_value, seed, measure_value
181                )
182    else:
183        for seed in range(hap.n_seeds_mdp):
184            for measure in hap.hardness_measures:
185                for varying_value in hap.varying_params_values:
186                    out = compute_hardness_measure_for_varying_prm(
187                        hap.mdp_class,
188                        hap.fixed_params,
189                        hap.varying_params_name,
190                        varying_value,
191                        seed,
192                        measure,
193                        force_single_core=True,
194                        retrieve_from_cache=hap.retrieve_from_cache,
195                        return_n_states=hap.varying_params_name == "size",
196                    )
197                    if hap.varying_params_name == "size":
198                        measure_name, measure_value, n_states = out
199                    else:
200                        measure_name, measure_value = out
201
202                    _add_result(
203                        measure_results,
204                        measure_name,
205                        varying_value
206                        if hap.varying_params_name != "size"
207                        else n_states,
208                        seed,
209                        measure_value,
210                    )
211
212    for measure_name in measure_results:
213        # Create a Pandas DataFrame
214        df = pd.DataFrame.from_dict(measure_results[measure_name])
215
216        # Normalize the values if required
217        if normalize_measures:
218            min_value = df.loc[:, measure_name].min()
219            max_value = df.loc[:, measure_name].max()
220            if max_value > min_value + 1e-4:
221                df.loc[:, measure_name] = (df.loc[:, measure_name] - min_value) / (
222                    max_value - min_value
223                )
224            else:
225                df.loc[:, measure_name] = 0.5
226
227        # Clean the varying parameter name
228        df = df.rename(columns={"Varying value": hap.clean_varying_prm_name}).set_index(
229            hap.clean_varying_prm_name
230        )
231
232        measure_results[measure_name] = df
233
234    return measure_results

computes the hardness measures for the scenarios and stored them in a pd.DataFrames.

Parameters
  • hap (HardnessAnalysisParams): The hardness analysis scenario to run.
  • normalize_measures (bool): If True, the values of the hardness measures are normalized.
Returns
  • Dict[str, pd.DataFrame]: A dictionary that associated the name of a hardness measure to its corresponding pd.DataFrame.
def compute_hardness_measure_for_varying_prm( mdp_class: Type[colosseum.mdp.base.BaseMDP], fixed_params: Dict[str, Any], varying_params_name: str, varying_value: Any, seed: int, measure: Union[str, Callable[[colosseum.mdp.base.BaseMDP], float]], force_single_core: bool = False, retrieve_from_cache: bool = True, folder: str = None, return_n_states: bool = False) -> Tuple[str, float, int]:
260def compute_hardness_measure_for_varying_prm(
261    mdp_class: Type["BaseMDP"],
262    fixed_params: Dict[str, Any],
263    varying_params_name: str,
264    varying_value: Any,
265    seed: int,
266    measure: Union[str, Callable[[BaseMDP], float]],
267    force_single_core: bool = False,
268    retrieve_from_cache: bool = True,
269    folder: str = None,
270    return_n_states: bool = False,
271) -> Tuple[str, float, int]:
272    """
273    computes the hardness measure for varying values of the parameter.
274
275    Parameters
276    ----------
277    mdp_class : Type["BaseMDP"]
278        The MDP class for which the measures will be computed.
279    fixed_params : Dict[str, Any]
280        The parameters of the MDP that are being kept fixed.
281    varying_params_name : str
282        The name of the varying parameter.
283    varying_value : Any
284        The value of the parameter that is varying.
285    seed : int
286        The random seed.
287    measure : Union[str, Callable[[BaseMDP], float]]
288        The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given
289        as a string, it will be looked for in the ones available in the package
290    force_single_core : bool
291        If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when
292        the measure is given as a function. By default, single processing is not enforced.
293    retrieve_from_cache : bool
294        If True, the function will look for cached values of the measure. Note that this also hold if the measure is
295        given as a function.
296    folder : str
297        The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`.
298    return_n_states : bool
299        If True, the number of states is returned.
300
301    Returns
302    -------
303    str
304        The nicely formatted name of the measure.
305    float
306        The value of the measure.
307    int, optional
308        The number of states.
309    """
310
311    # Instantiate the MDP parameters
312    mdp_kwargs = deepcopy(fixed_params)
313    mdp_kwargs["seed"] = seed
314    mdp_kwargs[varying_params_name] = varying_value
315
316    return compute_hardness_measure(
317        mdp_class,
318        mdp_kwargs,
319        measure,
320        force_single_core,
321        retrieve_from_cache,
322        folder,
323        True,
324        return_n_states,
325    )

computes the hardness measure for varying values of the parameter.

Parameters
  • mdp_class (Type["BaseMDP"]): The MDP class for which the measures will be computed.
  • fixed_params (Dict[str, Any]): The parameters of the MDP that are being kept fixed.
  • varying_params_name (str): The name of the varying parameter.
  • varying_value (Any): The value of the parameter that is varying.
  • seed (int): The random seed.
  • measure (Union[str, Callable[[BaseMDP], float]]): The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given as a string, it will be looked for in the ones available in the package
  • force_single_core (bool): If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when the measure is given as a function. By default, single processing is not enforced.
  • retrieve_from_cache (bool): If True, the function will look for cached values of the measure. Note that this also hold if the measure is given as a function.
  • folder (str): The folder where cached values are looked for. By default, it is the config.get_hardness_measures_cache_folder().
  • return_n_states (bool): If True, the number of states is returned.
Returns
  • str: The nicely formatted name of the measure.
  • float: The value of the measure.
  • int, optional: The number of states.
def compute_hardness_measure( mdp_class: Type[colosseum.mdp.base.BaseMDP], mdp_params: Dict[str, Any], measure: Union[str, Callable[[colosseum.mdp.base.BaseMDP], float]], force_single_core: bool = False, retrieve_from_cache: bool = True, folder: str = None, return_measure_name: bool = False, return_n_states: bool = False) -> Union[float, Tuple[str, float], Tuple[float, int], Tuple[str, float, int]]:
328def compute_hardness_measure(
329    mdp_class: Type["BaseMDP"],
330    mdp_params: Dict[str, Any],
331    measure: Union[str, Callable[[BaseMDP], float]],
332    force_single_core: bool = False,
333    retrieve_from_cache: bool = True,
334    folder: str = None,
335    return_measure_name: bool = False,
336    return_n_states: bool = False,
337) -> Union[float, Tuple[str, float], Tuple[float, int], Tuple[str, float, int]]:
338    """
339
340    Parameters
341    ----------
342    mdp_class : Type["BaseMDP"]
343        The MDP class for which the measures will be computed.
344    mdp_params : Dict[str, Any]
345        The parameters for the MDP.
346    measure : Union[str, Callable[[BaseMDP], float]]
347        The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given
348        as a string, it will be looked for in the ones available in the package
349    force_single_core : bool
350        If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when
351        the measure is given as a function. By default, single processing is not enforced.
352    retrieve_from_cache : bool
353        If True, the function will look for cached values of the measure. Note that this also hold if the measure is
354        given as a function.
355    folder : str
356        The folder where cached values are looked for. By default, it is the `config.get_hardness_measures_cache_folder()`.
357    return_measure_name : bool
358        If True, a nicely formatted name for the measure is returned.
359    return_n_states : bool
360        If True, the number of states is returned.
361
362    Returns
363    -------
364    str, optional
365        The nicely formatted name of the measure.
366    float
367        The value of the measure.
368    int, optional
369        The number of states.
370    """
371
372    # Obtain the name of the measure and the function to compute it
373    measure_name, measure_f = _process_measure(measure)
374
375    # Check if the measure has already been computed
376    if retrieve_from_cache:
377        mdp_shell = mdp_class(
378            **mdp_params, instantiate_mdp=False, exclude_horizon_from_parameters=True
379        )
380        if folder is None:
381            folder = (
382                config.get_hardness_measures_cache_folder()
383                + mdp_class.__name__
384                + os.sep
385            )
386
387        measure_file_path = f"{folder}{measure_name}_{mdp_shell.hash}.txt"
388        if os.path.isfile(measure_file_path):
389            with open(measure_file_path, "r") as f:
390                measure_value = float(f.read())
391
392            out = [measure_value]
393            if return_measure_name:
394                out.insert(0, measure_name)
395            if return_n_states:
396                mdp_shell.instantiate_MDP()
397                out.append(mdp_shell.n_states)
398            return out
399
400    # Possible forcing the computation to avoid using multiple cores
401    if force_single_core and config.get_available_cores() > 1:
402        available_cores = config.get_available_cores()
403        config.disable_multiprocessing()
404        mdp = mdp_class(**mdp_params)
405        measure_value = measure_f(mdp)
406        config.set_available_cores(available_cores)
407    else:
408        mdp = mdp_class(**mdp_params)
409        measure_value = measure_f(mdp)
410
411    # Caching the value of the measure (only in case we were supposed to look for it in the first place)
412    if retrieve_from_cache:
413        os.makedirs(os.path.dirname(measure_file_path), exist_ok=True)
414        with open(measure_file_path, "w") as f:
415            f.write(str(measure_value))
416
417    out = [measure_value]
418    if return_measure_name:
419        out.insert(0, measure_name)
420    if return_n_states:
421        out.append(mdp.n_states)
422    return out
Parameters
  • mdp_class (Type["BaseMDP"]): The MDP class for which the measures will be computed.
  • mdp_params (Dict[str, Any]): The parameters for the MDP.
  • measure (Union[str, Callable[[BaseMDP], float]]): The measure to be computed. It can be given as a function from MDP instances to float or as a string. If given as a string, it will be looked for in the ones available in the package
  • force_single_core (bool): If True, the computation of the measure is forced to use only a single core. Note that this is not enforced when the measure is given as a function. By default, single processing is not enforced.
  • retrieve_from_cache (bool): If True, the function will look for cached values of the measure. Note that this also hold if the measure is given as a function.
  • folder (str): The folder where cached values are looked for. By default, it is the config.get_hardness_measures_cache_folder().
  • return_measure_name (bool): If True, a nicely formatted name for the measure is returned.
  • return_n_states (bool): If True, the number of states is returned.
Returns
  • str, optional: The nicely formatted name of the measure.
  • float: The value of the measure.
  • int, optional: The number of states.