colosseum.experiment.experiment_instances

  1import os
  2import pickle
  3import shutil
  4from multiprocessing import Pool
  5from typing import List, Union, TYPE_CHECKING
  6
  7import numpy as np
  8import ray
  9from ray.util.multiprocessing import Pool as RayPool
 10from tqdm import tqdm
 11
 12from colosseum import config
 13from colosseum.benchmark.utils import retrieve_experiment_config
 14from colosseum.experiment.agent_mdp_interaction import MDPLoop
 15from colosseum.experiment.experiment_instance import ExperimentInstance
 16from colosseum.experiment.folder_structuring import _get_experiment_mdp_agent_couples
 17from colosseum.experiment.folder_structuring import get_mdp_agent_gin_configs
 18from colosseum.experiment.utils import apply_gin_config
 19from colosseum.experiment.utils import check_experiment_folder
 20from colosseum.utils import ensure_folder, make_mdp_spec
 21from colosseum.utils.acme import CSVLogger
 22
 23if TYPE_CHECKING:
 24    pass
 25
 26
 27def get_experiment_instances_from_folder(
 28    experiment_folder: str,
 29) -> List[ExperimentInstance]:
 30    """
 31    Returns
 32    -------
 33    List[ExperimentInstance]
 34        The `ExperimentInstance`s associated with the experiment folder.
 35    """
 36
 37    if config.VERBOSE_LEVEL != 0:
 38        print(f"\t- {experiment_folder}")
 39
 40    # Retrieve experiment configuration from dictionary in input or from .yml file
 41    experiment_config = retrieve_experiment_config(experiment_folder)
 42
 43    # Check that every experiment folder is well-structured
 44    check_experiment_folder(experiment_folder, experiment_config)
 45
 46    (
 47        mdp_classes_scopes,
 48        agent_classes_scopes,
 49        gin_config_files_paths,
 50    ) = get_mdp_agent_gin_configs(experiment_folder)
 51
 52    assert (
 53        len(mdp_classes_scopes) > 0
 54    ), f"No MDP gin configurations find in {experiment_folder}"
 55    assert (
 56        len(agent_classes_scopes) > 0
 57    ), f"No agent gin configurations find in {experiment_folder}"
 58    assert (
 59        len(gin_config_files_paths) > 0
 60    ), f"No gin configurations find in {experiment_folder}"
 61
 62    return _get_experiment_mdp_agent_couples(
 63        experiment_config,
 64        experiment_folder,
 65        mdp_classes_scopes,
 66        agent_classes_scopes,
 67        gin_config_files_paths,
 68    )
 69
 70
 71def save_instances_to_folder(
 72    experiment_instances: List[ExperimentInstance],
 73    store_instances_folder: str,
 74    overwrite=False,
 75) -> List[str]:
 76    """
 77    Parameters
 78    ----------
 79    experiment_instances : List[ExperimentInstance]
 80        The `ExperimentInstance`s to be store locally.
 81    store_instances_folder : str
 82        The folder where the `ExperimentInstance`s are to be stored.
 83    overwrite : bool
 84        If True, any file in store_instances_folder is removed. If False, it raises an error if store_instances_folder
 85        contains some files.
 86
 87    Returns
 88    -------
 89    List[str]
 90        The paths of the pickled `ExperimentInstance`s.
 91    """
 92
 93    # Prepare the folder to store the pickled experiment instances
 94    if (
 95        os.path.isdir(store_instances_folder)
 96        and len(os.listdir(store_instances_folder)) > 0
 97    ):
 98        if overwrite:
 99            shutil.rmtree(store_instances_folder)
100        else:
101            raise ValueError(
102                f"The store_instances_folder is not empty, {store_instances_folder}"
103            )
104    os.makedirs(store_instances_folder, exist_ok=True)
105
106    # Pickle the experiments instances to be run later
107    experiment_instance_paths = []
108    for i, exp_ins in enumerate(experiment_instances):
109        fp = ensure_folder(store_instances_folder) + f"exp_inst_{i+1}.pkl"
110        experiment_instance_paths.append(fp)
111        with open(fp, "wb") as f:
112            pickle.dump(exp_ins, f)
113
114    return experiment_instance_paths
115
116
117def run_experiment_instances(
118    experiment_instances: List[Union[ExperimentInstance, str]],
119    use_ray=False,
120):
121    """
122    runs the `ExperimentInstance`s locally. If multiprocessing is enabled in the package configuration, multiple cores
123    will be used to simultaneously run several `ExperimentInstance`s.
124
125    Parameters
126    ----------
127    experiment_instances : List[Union[ExperimentInstance, str]]
128        The `ExperimentInstance`s to run. They can either be `ExperimentInstance` objects or paths to the pickled
129        `ExperimentInstance`s.
130    use_ray : bool
131        If True, it uses `ray` to handle the multiprocessing. By default, the Python default `multiprocessing` module is
132        used.
133    """
134
135    if len(experiment_instances) == 0:
136        return
137
138    # Shuffle can improve speed of multiprocessing
139    np.random.RandomState(42).shuffle(experiment_instances)
140
141    exp_done = 0
142    if config.VERBOSE_LEVEL != 0:
143        tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
144    if len(experiment_instances) >= config.get_available_cores() > 1:
145        # Ensure that Colosseum does not use multiprocessing while we run the experiments
146        colosseum_cores_config = config.get_available_cores()
147        config.disable_multiprocessing()
148
149        if use_ray:
150            ray.init(num_cpus=colosseum_cores_config)
151            with RayPool(processes=colosseum_cores_config) as p:
152                for exp_ins in p.imap_unordered(
153                    run_experiment_instance, experiment_instances
154                ):
155                    exp_done += 1
156                    if config.VERBOSE_LEVEL != 0:
157                        tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
158            ray.shutdown()
159        else:
160            with Pool(processes=colosseum_cores_config) as p:
161                for exp_ins in p.imap_unordered(
162                    run_experiment_instance, experiment_instances
163                ):
164                    exp_done += 1
165                    if config.VERBOSE_LEVEL != 0:
166                        tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
167
168        # Restore the previous configuration
169        config.set_available_cores(colosseum_cores_config)
170    else:
171        for experiment_instance in experiment_instances:
172            run_experiment_instance(experiment_instance)
173            exp_done += 1
174            if config.VERBOSE_LEVEL != 0:
175                tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
176
177
178def run_experiment_instance(exp_ins: Union[ExperimentInstance, str]):
179    """
180    runs a single `ExperimentInstance`, which can be passed as `ExperimentInstance` object or as a path to a pickled
181    `ExperimentInstance` object.
182    """
183
184    # Load the experiment instance if a path is given
185    if type(exp_ins) == str:
186        with open(exp_ins, "rb") as f:
187            exp_ins = pickle.load(f)
188
189    import gin
190
191    apply_gin_config(exp_ins.gin_config_files)
192
193    with gin.config_scope(exp_ins.mdp_scope):
194        mdp = exp_ins.mdp_class(
195            seed=exp_ins.seed,
196            emission_map=exp_ins.emission_map,
197        )
198    with gin.config_scope(exp_ins.agent_scope):
199        agent = exp_ins.agent_class(
200            seed=exp_ins.seed,
201            mdp_specs=make_mdp_spec(mdp),
202            optimization_horizon=exp_ins.experiment_config.n_steps,
203        )
204
205    logger = CSVLogger(
206        exp_ins.result_folder,
207        add_uid=False,
208        label=exp_ins.experiment_label,
209        file_name=f"seed{exp_ins.seed}_logs",
210    )
211    loop = MDPLoop(mdp, agent, logger)
212    last_training_step, _ = loop.run(
213        exp_ins.experiment_config.n_steps,
214        exp_ins.experiment_config.log_performance_indicators_every,
215        exp_ins.experiment_config.max_interaction_time_s,
216    )
217
218    if last_training_step != -1:
219        with open(f"{logger._directory}{os.sep}time_exceeded.txt", "a") as f:
220            f.write(
221                f"last training step at ({last_training_step}) for {logger.file_path}\n"
222            )
223    return exp_ins
def get_experiment_instances_from_folder( experiment_folder: str) -> List[colosseum.experiment.experiment_instance.ExperimentInstance]:
28def get_experiment_instances_from_folder(
29    experiment_folder: str,
30) -> List[ExperimentInstance]:
31    """
32    Returns
33    -------
34    List[ExperimentInstance]
35        The `ExperimentInstance`s associated with the experiment folder.
36    """
37
38    if config.VERBOSE_LEVEL != 0:
39        print(f"\t- {experiment_folder}")
40
41    # Retrieve experiment configuration from dictionary in input or from .yml file
42    experiment_config = retrieve_experiment_config(experiment_folder)
43
44    # Check that every experiment folder is well-structured
45    check_experiment_folder(experiment_folder, experiment_config)
46
47    (
48        mdp_classes_scopes,
49        agent_classes_scopes,
50        gin_config_files_paths,
51    ) = get_mdp_agent_gin_configs(experiment_folder)
52
53    assert (
54        len(mdp_classes_scopes) > 0
55    ), f"No MDP gin configurations find in {experiment_folder}"
56    assert (
57        len(agent_classes_scopes) > 0
58    ), f"No agent gin configurations find in {experiment_folder}"
59    assert (
60        len(gin_config_files_paths) > 0
61    ), f"No gin configurations find in {experiment_folder}"
62
63    return _get_experiment_mdp_agent_couples(
64        experiment_config,
65        experiment_folder,
66        mdp_classes_scopes,
67        agent_classes_scopes,
68        gin_config_files_paths,
69    )
Returns
  • List[ExperimentInstance]: The ExperimentInstances associated with the experiment folder.
def save_instances_to_folder( experiment_instances: List[colosseum.experiment.experiment_instance.ExperimentInstance], store_instances_folder: str, overwrite=False) -> List[str]:
 72def save_instances_to_folder(
 73    experiment_instances: List[ExperimentInstance],
 74    store_instances_folder: str,
 75    overwrite=False,
 76) -> List[str]:
 77    """
 78    Parameters
 79    ----------
 80    experiment_instances : List[ExperimentInstance]
 81        The `ExperimentInstance`s to be store locally.
 82    store_instances_folder : str
 83        The folder where the `ExperimentInstance`s are to be stored.
 84    overwrite : bool
 85        If True, any file in store_instances_folder is removed. If False, it raises an error if store_instances_folder
 86        contains some files.
 87
 88    Returns
 89    -------
 90    List[str]
 91        The paths of the pickled `ExperimentInstance`s.
 92    """
 93
 94    # Prepare the folder to store the pickled experiment instances
 95    if (
 96        os.path.isdir(store_instances_folder)
 97        and len(os.listdir(store_instances_folder)) > 0
 98    ):
 99        if overwrite:
100            shutil.rmtree(store_instances_folder)
101        else:
102            raise ValueError(
103                f"The store_instances_folder is not empty, {store_instances_folder}"
104            )
105    os.makedirs(store_instances_folder, exist_ok=True)
106
107    # Pickle the experiments instances to be run later
108    experiment_instance_paths = []
109    for i, exp_ins in enumerate(experiment_instances):
110        fp = ensure_folder(store_instances_folder) + f"exp_inst_{i+1}.pkl"
111        experiment_instance_paths.append(fp)
112        with open(fp, "wb") as f:
113            pickle.dump(exp_ins, f)
114
115    return experiment_instance_paths
Parameters
  • experiment_instances (List[ExperimentInstance]): The ExperimentInstances to be store locally.
  • store_instances_folder (str): The folder where the ExperimentInstances are to be stored.
  • overwrite (bool): If True, any file in store_instances_folder is removed. If False, it raises an error if store_instances_folder contains some files.
Returns
  • List[str]: The paths of the pickled ExperimentInstances.
def run_experiment_instances( experiment_instances: List[Union[colosseum.experiment.experiment_instance.ExperimentInstance, str]], use_ray=False):
118def run_experiment_instances(
119    experiment_instances: List[Union[ExperimentInstance, str]],
120    use_ray=False,
121):
122    """
123    runs the `ExperimentInstance`s locally. If multiprocessing is enabled in the package configuration, multiple cores
124    will be used to simultaneously run several `ExperimentInstance`s.
125
126    Parameters
127    ----------
128    experiment_instances : List[Union[ExperimentInstance, str]]
129        The `ExperimentInstance`s to run. They can either be `ExperimentInstance` objects or paths to the pickled
130        `ExperimentInstance`s.
131    use_ray : bool
132        If True, it uses `ray` to handle the multiprocessing. By default, the Python default `multiprocessing` module is
133        used.
134    """
135
136    if len(experiment_instances) == 0:
137        return
138
139    # Shuffle can improve speed of multiprocessing
140    np.random.RandomState(42).shuffle(experiment_instances)
141
142    exp_done = 0
143    if config.VERBOSE_LEVEL != 0:
144        tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
145    if len(experiment_instances) >= config.get_available_cores() > 1:
146        # Ensure that Colosseum does not use multiprocessing while we run the experiments
147        colosseum_cores_config = config.get_available_cores()
148        config.disable_multiprocessing()
149
150        if use_ray:
151            ray.init(num_cpus=colosseum_cores_config)
152            with RayPool(processes=colosseum_cores_config) as p:
153                for exp_ins in p.imap_unordered(
154                    run_experiment_instance, experiment_instances
155                ):
156                    exp_done += 1
157                    if config.VERBOSE_LEVEL != 0:
158                        tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
159            ray.shutdown()
160        else:
161            with Pool(processes=colosseum_cores_config) as p:
162                for exp_ins in p.imap_unordered(
163                    run_experiment_instance, experiment_instances
164                ):
165                    exp_done += 1
166                    if config.VERBOSE_LEVEL != 0:
167                        tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
168
169        # Restore the previous configuration
170        config.set_available_cores(colosseum_cores_config)
171    else:
172        for experiment_instance in experiment_instances:
173            run_experiment_instance(experiment_instance)
174            exp_done += 1
175            if config.VERBOSE_LEVEL != 0:
176                tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")

runs the ExperimentInstances locally. If multiprocessing is enabled in the package configuration, multiple cores will be used to simultaneously run several ExperimentInstances.

Parameters
  • experiment_instances (List[Union[ExperimentInstance, str]]): The ExperimentInstances to run. They can either be ExperimentInstance objects or paths to the pickled ExperimentInstances.
  • use_ray (bool): If True, it uses ray to handle the multiprocessing. By default, the Python default multiprocessing module is used.
def run_experiment_instance( exp_ins: Union[colosseum.experiment.experiment_instance.ExperimentInstance, str]):
179def run_experiment_instance(exp_ins: Union[ExperimentInstance, str]):
180    """
181    runs a single `ExperimentInstance`, which can be passed as `ExperimentInstance` object or as a path to a pickled
182    `ExperimentInstance` object.
183    """
184
185    # Load the experiment instance if a path is given
186    if type(exp_ins) == str:
187        with open(exp_ins, "rb") as f:
188            exp_ins = pickle.load(f)
189
190    import gin
191
192    apply_gin_config(exp_ins.gin_config_files)
193
194    with gin.config_scope(exp_ins.mdp_scope):
195        mdp = exp_ins.mdp_class(
196            seed=exp_ins.seed,
197            emission_map=exp_ins.emission_map,
198        )
199    with gin.config_scope(exp_ins.agent_scope):
200        agent = exp_ins.agent_class(
201            seed=exp_ins.seed,
202            mdp_specs=make_mdp_spec(mdp),
203            optimization_horizon=exp_ins.experiment_config.n_steps,
204        )
205
206    logger = CSVLogger(
207        exp_ins.result_folder,
208        add_uid=False,
209        label=exp_ins.experiment_label,
210        file_name=f"seed{exp_ins.seed}_logs",
211    )
212    loop = MDPLoop(mdp, agent, logger)
213    last_training_step, _ = loop.run(
214        exp_ins.experiment_config.n_steps,
215        exp_ins.experiment_config.log_performance_indicators_every,
216        exp_ins.experiment_config.max_interaction_time_s,
217    )
218
219    if last_training_step != -1:
220        with open(f"{logger._directory}{os.sep}time_exceeded.txt", "a") as f:
221            f.write(
222                f"last training step at ({last_training_step}) for {logger.file_path}\n"
223            )
224    return exp_ins

runs a single ExperimentInstance, which can be passed as ExperimentInstance object or as a path to a pickled ExperimentInstance object.