colosseum.experiment.folder_structuring
1import os 2import re 3import shutil 4from glob import glob 5from tempfile import gettempdir 6from typing import Dict, List, Set, Tuple, Type 7 8import numpy as np 9import pandas as pd 10import yaml 11from tqdm import tqdm 12 13from colosseum import config 14from colosseum.agent.agents.base import BaseAgent 15from colosseum.emission_maps import get_emission_map_from_name 16from colosseum.experiment import ExperimentConfig 17from colosseum.experiment.experiment_instance import ExperimentInstance 18from colosseum.mdp import BaseMDP 19from colosseum.utils import ensure_folder 20from colosseum.utils.miscellanea import get_agent_class_from_name 21from colosseum.utils.miscellanea import get_mdp_class_from_name 22 23 24def get_mdp_agent_gin_configs( 25 experiment_folder: str, 26) -> Tuple[ 27 Dict[Type["BaseMDP"], Set[str]], Dict[Type["BaseAgent"], Set[str]], List[str] 28]: 29 """ 30 Returns 31 ------- 32 Dict[Type["BaseMDP"], Set[str]] 33 The dictionary that associated to each MDP class the set of gin configuration indices found in the experiment 34 folder. 35 Dict[Type["BaseAgent"], Set[str]] 36 The dictionary that associated to each agent class the set of gin configuration indices found in the experiment 37 folder. 38 List[str] 39 The gin configuration file paths found in the experiment_folder. 40 """ 41 gin_config_files_paths = [] 42 43 mdp_classes_scopes = dict() 44 for mdp_config_file in glob( 45 f"{ensure_folder(experiment_folder)}mdp_configs{os.sep}*" 46 ): 47 with open(mdp_config_file, "r") as f: 48 f_ = f.read() 49 mdp_scopes = set(re.findall(r"prms_\d+", f_)) 50 mdp_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0] 51 mdp_class = get_mdp_class_from_name(mdp_class_name) 52 mdp_classes_scopes[mdp_class] = mdp_scopes 53 gin_config_files_paths.append(mdp_config_file) 54 55 agent_classes_scopes = dict() 56 for agent_config_file in glob( 57 f"{ensure_folder(experiment_folder)}agents_configs{os.sep}*" 58 ): 59 with open(agent_config_file, "r") as f: 60 f_ = f.read() 61 agent_scopes = set(re.findall(r"prms_\d+", f_)) 62 agent_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0] 63 agent_class = get_agent_class_from_name(agent_class_name) 64 agent_classes_scopes[agent_class] = agent_scopes 65 gin_config_files_paths.append(agent_config_file) 66 67 classes = list(mdp_classes_scopes.keys()) + list(agent_classes_scopes.keys()) 68 assert sum([c.is_episodic() for c in classes]) in [0, len(classes)], ( 69 f"Episodic and infinite horizon agents and/or MDP instances should not be mixed." 70 f"Please check the configuration files of {experiment_folder}." 71 ) 72 73 return mdp_classes_scopes, agent_classes_scopes, gin_config_files_paths 74 75 76def _get_experiment_mdp_agent_couples( 77 experiment_config: ExperimentConfig, 78 experiment_cur_folder: str, 79 mdp_classes_scopes: Dict[Type["BaseMDP"], Set[str]], 80 agent_classes_scopes: Dict[Type["BaseAgent"], Set[str]], 81 gin_config_files_paths: List[str], 82) -> List[ExperimentInstance]: 83 experiment_mdp_agent_couples = [] 84 for seed in range(experiment_config.n_seeds): 85 for mdp_class, mdp_scopes in mdp_classes_scopes.items(): 86 for mdp_scope in mdp_scopes: 87 for ( 88 agent_class, 89 agent_scopes, 90 ) in agent_classes_scopes.items(): 91 for agent_scope in agent_scopes: 92 exp_inst = ExperimentInstance( 93 seed, 94 mdp_class, 95 mdp_scope, 96 agent_class, 97 agent_scope, 98 experiment_cur_folder, 99 gin_config_files_paths, 100 experiment_config, 101 ) 102 if not exp_inst.does_log_file_exists: 103 experiment_mdp_agent_couples.append(exp_inst) 104 return experiment_mdp_agent_couples 105 106 107def get_experiment_config(experiment_folder: str) -> ExperimentConfig: 108 """ 109 Returns 110 ------- 111 ExperimentConfig 112 The `ExperimentConfig` corresponding to the experiment folder. 113 """ 114 115 config_file = ensure_folder(experiment_folder) + "experiment_config.yml" 116 with open(config_file, "r") as f: 117 experiment_config = yaml.load(f, yaml.Loader) 118 return ExperimentConfig( 119 n_seeds=experiment_config["n_seeds"], 120 n_steps=experiment_config["n_steps"], 121 max_interaction_time_s=experiment_config["max_interaction_time_s"], 122 log_performance_indicators_every=experiment_config[ 123 "log_performance_indicators_every" 124 ], 125 emission_map=get_emission_map_from_name( 126 experiment_config["emission_map"] 127 if "emission_map" in experiment_config 128 else "Tabular" 129 ), 130 ) 131 132 133def _clean_time_exceeded_records(log_file: str): 134 """ 135 checks if the log file has been classified as an experiment that exceeded the time limit and, if so, it cleans the 136 record. 137 """ 138 time_exceeded_experiment_record = ( 139 log_file[: log_file.rfind(os.sep)] + os.sep + "time_exceeded.txt" 140 ) 141 if os.path.exists(time_exceeded_experiment_record): 142 with open(time_exceeded_experiment_record, "r") as ff: 143 te = ff.readlines() 144 for tee in te: 145 if log_file in tee: 146 te.remove(tee) 147 break 148 if len(te) > 0: 149 with open(time_exceeded_experiment_record, "w") as ff: 150 ff.write("".join(te)) 151 else: 152 os.remove(time_exceeded_experiment_record) 153 154 155def remove_corrupted_log_files( 156 experiment_folder: str, 157 experiment_config: ExperimentConfig = None, 158) -> List[str]: 159 """ 160 checks if there are any inconsistencies in the log files of an experiment and removes them. 161 """ 162 163 assert experiment_config is not None or os.path.isfile( 164 ensure_folder(experiment_folder) + "experiment_config.yml" 165 ) 166 if not os.path.isdir(ensure_folder(experiment_folder) + "logs"): 167 return 168 169 if experiment_config is None: 170 with open(ensure_folder(experiment_folder) + "experiment_config.yml", "r") as f: 171 experiment_config = ExperimentConfig(**yaml.load(f, yaml.Loader)) 172 173 file_paths = glob(f"{experiment_folder}{os.sep}**{os.sep}*.csv", recursive=True) 174 if config.VERBOSE_LEVEL != 0: 175 file_paths = tqdm(file_paths, desc="Checking for corrupted log files") 176 177 corrupted_files = [] 178 for f in file_paths: 179 with open(f, "r") as ff: 180 len_f = len(ff.readlines()) 181 logged_steps = [] if len_f <= 1 else pd.read_csv(f).steps.tolist() 182 if ( 183 len_f <= 1 184 or 185 # checks whether there are any inconsistencies in the order of the logs 186 any(np.diff(pd.read_csv(f).steps) < 0) 187 # checks that all the steps have been logged 188 or not ( 189 all( 190 t in logged_steps 191 for t in range(1, experiment_config.n_steps) 192 if t % experiment_config.log_performance_indicators_every == 0 193 ) 194 and (experiment_config.n_steps - 1) in logged_steps 195 ) 196 ): 197 # If it was registered that this instance failed due to the time constrain, we remove that record since we 198 # are going to run this agent/MDP interaction from scratch. 199 _clean_time_exceeded_records(f) 200 201 # Moving the file to the temporary file folder just in case we want to double-check the formatting error. 202 shutil.move( 203 f, 204 gettempdir() 205 + f"{os.sep}_{len(corrupted_files)}_" 206 + f[f.rfind(os.sep) + 1 :], 207 ) 208 corrupted_files.append(f) 209 tqdm.write( 210 f"The file {f} has been moved to tmp as it has some formatting errors." 211 ) 212 213 if config.VERBOSE_LEVEL != 0: 214 print(corrupted_files) 215 216 return corrupted_files
def
get_mdp_agent_gin_configs( experiment_folder: str) -> Tuple[Dict[Type[colosseum.mdp.base.BaseMDP], Set[str]], Dict[Type[colosseum.agent.agents.base.BaseAgent], Set[str]], List[str]]:
25def get_mdp_agent_gin_configs( 26 experiment_folder: str, 27) -> Tuple[ 28 Dict[Type["BaseMDP"], Set[str]], Dict[Type["BaseAgent"], Set[str]], List[str] 29]: 30 """ 31 Returns 32 ------- 33 Dict[Type["BaseMDP"], Set[str]] 34 The dictionary that associated to each MDP class the set of gin configuration indices found in the experiment 35 folder. 36 Dict[Type["BaseAgent"], Set[str]] 37 The dictionary that associated to each agent class the set of gin configuration indices found in the experiment 38 folder. 39 List[str] 40 The gin configuration file paths found in the experiment_folder. 41 """ 42 gin_config_files_paths = [] 43 44 mdp_classes_scopes = dict() 45 for mdp_config_file in glob( 46 f"{ensure_folder(experiment_folder)}mdp_configs{os.sep}*" 47 ): 48 with open(mdp_config_file, "r") as f: 49 f_ = f.read() 50 mdp_scopes = set(re.findall(r"prms_\d+", f_)) 51 mdp_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0] 52 mdp_class = get_mdp_class_from_name(mdp_class_name) 53 mdp_classes_scopes[mdp_class] = mdp_scopes 54 gin_config_files_paths.append(mdp_config_file) 55 56 agent_classes_scopes = dict() 57 for agent_config_file in glob( 58 f"{ensure_folder(experiment_folder)}agents_configs{os.sep}*" 59 ): 60 with open(agent_config_file, "r") as f: 61 f_ = f.read() 62 agent_scopes = set(re.findall(r"prms_\d+", f_)) 63 agent_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0] 64 agent_class = get_agent_class_from_name(agent_class_name) 65 agent_classes_scopes[agent_class] = agent_scopes 66 gin_config_files_paths.append(agent_config_file) 67 68 classes = list(mdp_classes_scopes.keys()) + list(agent_classes_scopes.keys()) 69 assert sum([c.is_episodic() for c in classes]) in [0, len(classes)], ( 70 f"Episodic and infinite horizon agents and/or MDP instances should not be mixed." 71 f"Please check the configuration files of {experiment_folder}." 72 ) 73 74 return mdp_classes_scopes, agent_classes_scopes, gin_config_files_paths
Returns
- Dict[Type["BaseMDP"], Set[str]]: The dictionary that associated to each MDP class the set of gin configuration indices found in the experiment folder.
- Dict[Type["BaseAgent"], Set[str]]: The dictionary that associated to each agent class the set of gin configuration indices found in the experiment folder.
- List[str]: The gin configuration file paths found in the experiment_folder.
108def get_experiment_config(experiment_folder: str) -> ExperimentConfig: 109 """ 110 Returns 111 ------- 112 ExperimentConfig 113 The `ExperimentConfig` corresponding to the experiment folder. 114 """ 115 116 config_file = ensure_folder(experiment_folder) + "experiment_config.yml" 117 with open(config_file, "r") as f: 118 experiment_config = yaml.load(f, yaml.Loader) 119 return ExperimentConfig( 120 n_seeds=experiment_config["n_seeds"], 121 n_steps=experiment_config["n_steps"], 122 max_interaction_time_s=experiment_config["max_interaction_time_s"], 123 log_performance_indicators_every=experiment_config[ 124 "log_performance_indicators_every" 125 ], 126 emission_map=get_emission_map_from_name( 127 experiment_config["emission_map"] 128 if "emission_map" in experiment_config 129 else "Tabular" 130 ), 131 )
Returns
- ExperimentConfig: The
ExperimentConfig
corresponding to the experiment folder.
def
remove_corrupted_log_files( experiment_folder: str, experiment_config: colosseum.experiment.config.ExperimentConfig = None) -> List[str]:
156def remove_corrupted_log_files( 157 experiment_folder: str, 158 experiment_config: ExperimentConfig = None, 159) -> List[str]: 160 """ 161 checks if there are any inconsistencies in the log files of an experiment and removes them. 162 """ 163 164 assert experiment_config is not None or os.path.isfile( 165 ensure_folder(experiment_folder) + "experiment_config.yml" 166 ) 167 if not os.path.isdir(ensure_folder(experiment_folder) + "logs"): 168 return 169 170 if experiment_config is None: 171 with open(ensure_folder(experiment_folder) + "experiment_config.yml", "r") as f: 172 experiment_config = ExperimentConfig(**yaml.load(f, yaml.Loader)) 173 174 file_paths = glob(f"{experiment_folder}{os.sep}**{os.sep}*.csv", recursive=True) 175 if config.VERBOSE_LEVEL != 0: 176 file_paths = tqdm(file_paths, desc="Checking for corrupted log files") 177 178 corrupted_files = [] 179 for f in file_paths: 180 with open(f, "r") as ff: 181 len_f = len(ff.readlines()) 182 logged_steps = [] if len_f <= 1 else pd.read_csv(f).steps.tolist() 183 if ( 184 len_f <= 1 185 or 186 # checks whether there are any inconsistencies in the order of the logs 187 any(np.diff(pd.read_csv(f).steps) < 0) 188 # checks that all the steps have been logged 189 or not ( 190 all( 191 t in logged_steps 192 for t in range(1, experiment_config.n_steps) 193 if t % experiment_config.log_performance_indicators_every == 0 194 ) 195 and (experiment_config.n_steps - 1) in logged_steps 196 ) 197 ): 198 # If it was registered that this instance failed due to the time constrain, we remove that record since we 199 # are going to run this agent/MDP interaction from scratch. 200 _clean_time_exceeded_records(f) 201 202 # Moving the file to the temporary file folder just in case we want to double-check the formatting error. 203 shutil.move( 204 f, 205 gettempdir() 206 + f"{os.sep}_{len(corrupted_files)}_" 207 + f[f.rfind(os.sep) + 1 :], 208 ) 209 corrupted_files.append(f) 210 tqdm.write( 211 f"The file {f} has been moved to tmp as it has some formatting errors." 212 ) 213 214 if config.VERBOSE_LEVEL != 0: 215 print(corrupted_files) 216 217 return corrupted_files
checks if there are any inconsistencies in the log files of an experiment and removes them.