colosseum.experiment.experiment_instances
1import os 2import pickle 3import shutil 4from multiprocessing import Pool 5from typing import List, Union, TYPE_CHECKING 6 7import numpy as np 8import ray 9from ray.util.multiprocessing import Pool as RayPool 10from tqdm import tqdm 11 12from colosseum import config 13from colosseum.benchmark.utils import retrieve_experiment_config 14from colosseum.experiment.agent_mdp_interaction import MDPLoop 15from colosseum.experiment.experiment_instance import ExperimentInstance 16from colosseum.experiment.folder_structuring import _get_experiment_mdp_agent_couples 17from colosseum.experiment.folder_structuring import get_mdp_agent_gin_configs 18from colosseum.experiment.utils import apply_gin_config 19from colosseum.experiment.utils import check_experiment_folder 20from colosseum.utils import ensure_folder, make_mdp_spec 21from colosseum.utils.acme import CSVLogger 22 23if TYPE_CHECKING: 24 pass 25 26 27def get_experiment_instances_from_folder( 28 experiment_folder: str, 29) -> List[ExperimentInstance]: 30 """ 31 Returns 32 ------- 33 List[ExperimentInstance] 34 The `ExperimentInstance`s associated with the experiment folder. 35 """ 36 37 if config.VERBOSE_LEVEL != 0: 38 print(f"\t- {experiment_folder}") 39 40 # Retrieve experiment configuration from dictionary in input or from .yml file 41 experiment_config = retrieve_experiment_config(experiment_folder) 42 43 # Check that every experiment folder is well-structured 44 check_experiment_folder(experiment_folder, experiment_config) 45 46 ( 47 mdp_classes_scopes, 48 agent_classes_scopes, 49 gin_config_files_paths, 50 ) = get_mdp_agent_gin_configs(experiment_folder) 51 52 assert ( 53 len(mdp_classes_scopes) > 0 54 ), f"No MDP gin configurations find in {experiment_folder}" 55 assert ( 56 len(agent_classes_scopes) > 0 57 ), f"No agent gin configurations find in {experiment_folder}" 58 assert ( 59 len(gin_config_files_paths) > 0 60 ), f"No gin configurations find in {experiment_folder}" 61 62 return _get_experiment_mdp_agent_couples( 63 experiment_config, 64 experiment_folder, 65 mdp_classes_scopes, 66 agent_classes_scopes, 67 gin_config_files_paths, 68 ) 69 70 71def save_instances_to_folder( 72 experiment_instances: List[ExperimentInstance], 73 store_instances_folder: str, 74 overwrite=False, 75) -> List[str]: 76 """ 77 Parameters 78 ---------- 79 experiment_instances : List[ExperimentInstance] 80 The `ExperimentInstance`s to be store locally. 81 store_instances_folder : str 82 The folder where the `ExperimentInstance`s are to be stored. 83 overwrite : bool 84 If True, any file in store_instances_folder is removed. If False, it raises an error if store_instances_folder 85 contains some files. 86 87 Returns 88 ------- 89 List[str] 90 The paths of the pickled `ExperimentInstance`s. 91 """ 92 93 # Prepare the folder to store the pickled experiment instances 94 if ( 95 os.path.isdir(store_instances_folder) 96 and len(os.listdir(store_instances_folder)) > 0 97 ): 98 if overwrite: 99 shutil.rmtree(store_instances_folder) 100 else: 101 raise ValueError( 102 f"The store_instances_folder is not empty, {store_instances_folder}" 103 ) 104 os.makedirs(store_instances_folder, exist_ok=True) 105 106 # Pickle the experiments instances to be run later 107 experiment_instance_paths = [] 108 for i, exp_ins in enumerate(experiment_instances): 109 fp = ensure_folder(store_instances_folder) + f"exp_inst_{i+1}.pkl" 110 experiment_instance_paths.append(fp) 111 with open(fp, "wb") as f: 112 pickle.dump(exp_ins, f) 113 114 return experiment_instance_paths 115 116 117def run_experiment_instances( 118 experiment_instances: List[Union[ExperimentInstance, str]], 119 use_ray=False, 120): 121 """ 122 runs the `ExperimentInstance`s locally. If multiprocessing is enabled in the package configuration, multiple cores 123 will be used to simultaneously run several `ExperimentInstance`s. 124 125 Parameters 126 ---------- 127 experiment_instances : List[Union[ExperimentInstance, str]] 128 The `ExperimentInstance`s to run. They can either be `ExperimentInstance` objects or paths to the pickled 129 `ExperimentInstance`s. 130 use_ray : bool 131 If True, it uses `ray` to handle the multiprocessing. By default, the Python default `multiprocessing` module is 132 used. 133 """ 134 135 if len(experiment_instances) == 0: 136 return 137 138 # Shuffle can improve speed of multiprocessing 139 np.random.RandomState(42).shuffle(experiment_instances) 140 141 exp_done = 0 142 if config.VERBOSE_LEVEL != 0: 143 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}") 144 if len(experiment_instances) >= config.get_available_cores() > 1: 145 # Ensure that Colosseum does not use multiprocessing while we run the experiments 146 colosseum_cores_config = config.get_available_cores() 147 config.disable_multiprocessing() 148 149 if use_ray: 150 ray.init(num_cpus=colosseum_cores_config) 151 with RayPool(processes=colosseum_cores_config) as p: 152 for exp_ins in p.imap_unordered( 153 run_experiment_instance, experiment_instances 154 ): 155 exp_done += 1 156 if config.VERBOSE_LEVEL != 0: 157 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}") 158 ray.shutdown() 159 else: 160 with Pool(processes=colosseum_cores_config) as p: 161 for exp_ins in p.imap_unordered( 162 run_experiment_instance, experiment_instances 163 ): 164 exp_done += 1 165 if config.VERBOSE_LEVEL != 0: 166 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}") 167 168 # Restore the previous configuration 169 config.set_available_cores(colosseum_cores_config) 170 else: 171 for experiment_instance in experiment_instances: 172 run_experiment_instance(experiment_instance) 173 exp_done += 1 174 if config.VERBOSE_LEVEL != 0: 175 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}") 176 177 178def run_experiment_instance(exp_ins: Union[ExperimentInstance, str]): 179 """ 180 runs a single `ExperimentInstance`, which can be passed as `ExperimentInstance` object or as a path to a pickled 181 `ExperimentInstance` object. 182 """ 183 184 # Load the experiment instance if a path is given 185 if type(exp_ins) == str: 186 with open(exp_ins, "rb") as f: 187 exp_ins = pickle.load(f) 188 189 import gin 190 191 apply_gin_config(exp_ins.gin_config_files) 192 193 with gin.config_scope(exp_ins.mdp_scope): 194 mdp = exp_ins.mdp_class( 195 seed=exp_ins.seed, 196 emission_map=exp_ins.emission_map, 197 ) 198 with gin.config_scope(exp_ins.agent_scope): 199 agent = exp_ins.agent_class( 200 seed=exp_ins.seed, 201 mdp_specs=make_mdp_spec(mdp), 202 optimization_horizon=exp_ins.experiment_config.n_steps, 203 ) 204 205 logger = CSVLogger( 206 exp_ins.result_folder, 207 add_uid=False, 208 label=exp_ins.experiment_label, 209 file_name=f"seed{exp_ins.seed}_logs", 210 ) 211 loop = MDPLoop(mdp, agent, logger) 212 last_training_step, _ = loop.run( 213 exp_ins.experiment_config.n_steps, 214 exp_ins.experiment_config.log_performance_indicators_every, 215 exp_ins.experiment_config.max_interaction_time_s, 216 ) 217 218 if last_training_step != -1: 219 with open(f"{logger._directory}{os.sep}time_exceeded.txt", "a") as f: 220 f.write( 221 f"last training step at ({last_training_step}) for {logger.file_path}\n" 222 ) 223 return exp_ins
def
get_experiment_instances_from_folder( experiment_folder: str) -> List[colosseum.experiment.experiment_instance.ExperimentInstance]:
28def get_experiment_instances_from_folder( 29 experiment_folder: str, 30) -> List[ExperimentInstance]: 31 """ 32 Returns 33 ------- 34 List[ExperimentInstance] 35 The `ExperimentInstance`s associated with the experiment folder. 36 """ 37 38 if config.VERBOSE_LEVEL != 0: 39 print(f"\t- {experiment_folder}") 40 41 # Retrieve experiment configuration from dictionary in input or from .yml file 42 experiment_config = retrieve_experiment_config(experiment_folder) 43 44 # Check that every experiment folder is well-structured 45 check_experiment_folder(experiment_folder, experiment_config) 46 47 ( 48 mdp_classes_scopes, 49 agent_classes_scopes, 50 gin_config_files_paths, 51 ) = get_mdp_agent_gin_configs(experiment_folder) 52 53 assert ( 54 len(mdp_classes_scopes) > 0 55 ), f"No MDP gin configurations find in {experiment_folder}" 56 assert ( 57 len(agent_classes_scopes) > 0 58 ), f"No agent gin configurations find in {experiment_folder}" 59 assert ( 60 len(gin_config_files_paths) > 0 61 ), f"No gin configurations find in {experiment_folder}" 62 63 return _get_experiment_mdp_agent_couples( 64 experiment_config, 65 experiment_folder, 66 mdp_classes_scopes, 67 agent_classes_scopes, 68 gin_config_files_paths, 69 )
Returns
- List[ExperimentInstance]: The
ExperimentInstance
s associated with the experiment folder.
def
save_instances_to_folder( experiment_instances: List[colosseum.experiment.experiment_instance.ExperimentInstance], store_instances_folder: str, overwrite=False) -> List[str]:
72def save_instances_to_folder( 73 experiment_instances: List[ExperimentInstance], 74 store_instances_folder: str, 75 overwrite=False, 76) -> List[str]: 77 """ 78 Parameters 79 ---------- 80 experiment_instances : List[ExperimentInstance] 81 The `ExperimentInstance`s to be store locally. 82 store_instances_folder : str 83 The folder where the `ExperimentInstance`s are to be stored. 84 overwrite : bool 85 If True, any file in store_instances_folder is removed. If False, it raises an error if store_instances_folder 86 contains some files. 87 88 Returns 89 ------- 90 List[str] 91 The paths of the pickled `ExperimentInstance`s. 92 """ 93 94 # Prepare the folder to store the pickled experiment instances 95 if ( 96 os.path.isdir(store_instances_folder) 97 and len(os.listdir(store_instances_folder)) > 0 98 ): 99 if overwrite: 100 shutil.rmtree(store_instances_folder) 101 else: 102 raise ValueError( 103 f"The store_instances_folder is not empty, {store_instances_folder}" 104 ) 105 os.makedirs(store_instances_folder, exist_ok=True) 106 107 # Pickle the experiments instances to be run later 108 experiment_instance_paths = [] 109 for i, exp_ins in enumerate(experiment_instances): 110 fp = ensure_folder(store_instances_folder) + f"exp_inst_{i+1}.pkl" 111 experiment_instance_paths.append(fp) 112 with open(fp, "wb") as f: 113 pickle.dump(exp_ins, f) 114 115 return experiment_instance_paths
Parameters
- experiment_instances (List[ExperimentInstance]):
The
ExperimentInstance
s to be store locally. - store_instances_folder (str):
The folder where the
ExperimentInstance
s are to be stored. - overwrite (bool): If True, any file in store_instances_folder is removed. If False, it raises an error if store_instances_folder contains some files.
Returns
- List[str]: The paths of the pickled
ExperimentInstance
s.
def
run_experiment_instances( experiment_instances: List[Union[colosseum.experiment.experiment_instance.ExperimentInstance, str]], use_ray=False):
118def run_experiment_instances( 119 experiment_instances: List[Union[ExperimentInstance, str]], 120 use_ray=False, 121): 122 """ 123 runs the `ExperimentInstance`s locally. If multiprocessing is enabled in the package configuration, multiple cores 124 will be used to simultaneously run several `ExperimentInstance`s. 125 126 Parameters 127 ---------- 128 experiment_instances : List[Union[ExperimentInstance, str]] 129 The `ExperimentInstance`s to run. They can either be `ExperimentInstance` objects or paths to the pickled 130 `ExperimentInstance`s. 131 use_ray : bool 132 If True, it uses `ray` to handle the multiprocessing. By default, the Python default `multiprocessing` module is 133 used. 134 """ 135 136 if len(experiment_instances) == 0: 137 return 138 139 # Shuffle can improve speed of multiprocessing 140 np.random.RandomState(42).shuffle(experiment_instances) 141 142 exp_done = 0 143 if config.VERBOSE_LEVEL != 0: 144 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}") 145 if len(experiment_instances) >= config.get_available_cores() > 1: 146 # Ensure that Colosseum does not use multiprocessing while we run the experiments 147 colosseum_cores_config = config.get_available_cores() 148 config.disable_multiprocessing() 149 150 if use_ray: 151 ray.init(num_cpus=colosseum_cores_config) 152 with RayPool(processes=colosseum_cores_config) as p: 153 for exp_ins in p.imap_unordered( 154 run_experiment_instance, experiment_instances 155 ): 156 exp_done += 1 157 if config.VERBOSE_LEVEL != 0: 158 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}") 159 ray.shutdown() 160 else: 161 with Pool(processes=colosseum_cores_config) as p: 162 for exp_ins in p.imap_unordered( 163 run_experiment_instance, experiment_instances 164 ): 165 exp_done += 1 166 if config.VERBOSE_LEVEL != 0: 167 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}") 168 169 # Restore the previous configuration 170 config.set_available_cores(colosseum_cores_config) 171 else: 172 for experiment_instance in experiment_instances: 173 run_experiment_instance(experiment_instance) 174 exp_done += 1 175 if config.VERBOSE_LEVEL != 0: 176 tqdm.write(f"Completed: {exp_done}/{len(experiment_instances)}")
runs the ExperimentInstance
s locally. If multiprocessing is enabled in the package configuration, multiple cores
will be used to simultaneously run several ExperimentInstance
s.
Parameters
- experiment_instances (List[Union[ExperimentInstance, str]]):
The
ExperimentInstance
s to run. They can either beExperimentInstance
objects or paths to the pickledExperimentInstance
s. - use_ray (bool):
If True, it uses
ray
to handle the multiprocessing. By default, the Python defaultmultiprocessing
module is used.
def
run_experiment_instance( exp_ins: Union[colosseum.experiment.experiment_instance.ExperimentInstance, str]):
179def run_experiment_instance(exp_ins: Union[ExperimentInstance, str]): 180 """ 181 runs a single `ExperimentInstance`, which can be passed as `ExperimentInstance` object or as a path to a pickled 182 `ExperimentInstance` object. 183 """ 184 185 # Load the experiment instance if a path is given 186 if type(exp_ins) == str: 187 with open(exp_ins, "rb") as f: 188 exp_ins = pickle.load(f) 189 190 import gin 191 192 apply_gin_config(exp_ins.gin_config_files) 193 194 with gin.config_scope(exp_ins.mdp_scope): 195 mdp = exp_ins.mdp_class( 196 seed=exp_ins.seed, 197 emission_map=exp_ins.emission_map, 198 ) 199 with gin.config_scope(exp_ins.agent_scope): 200 agent = exp_ins.agent_class( 201 seed=exp_ins.seed, 202 mdp_specs=make_mdp_spec(mdp), 203 optimization_horizon=exp_ins.experiment_config.n_steps, 204 ) 205 206 logger = CSVLogger( 207 exp_ins.result_folder, 208 add_uid=False, 209 label=exp_ins.experiment_label, 210 file_name=f"seed{exp_ins.seed}_logs", 211 ) 212 loop = MDPLoop(mdp, agent, logger) 213 last_training_step, _ = loop.run( 214 exp_ins.experiment_config.n_steps, 215 exp_ins.experiment_config.log_performance_indicators_every, 216 exp_ins.experiment_config.max_interaction_time_s, 217 ) 218 219 if last_training_step != -1: 220 with open(f"{logger._directory}{os.sep}time_exceeded.txt", "a") as f: 221 f.write( 222 f"last training step at ({last_training_step}) for {logger.file_path}\n" 223 ) 224 return exp_ins
runs a single ExperimentInstance
, which can be passed as ExperimentInstance
object or as a path to a pickled
ExperimentInstance
object.