colosseum.experiment.folder_structuring

  1import os
  2import re
  3import shutil
  4from glob import glob
  5from tempfile import gettempdir
  6from typing import Dict, List, Set, Tuple, Type
  7
  8import numpy as np
  9import pandas as pd
 10import yaml
 11from tqdm import tqdm
 12
 13from colosseum import config
 14from colosseum.agent.agents.base import BaseAgent
 15from colosseum.emission_maps import get_emission_map_from_name
 16from colosseum.experiment import ExperimentConfig
 17from colosseum.experiment.experiment_instance import ExperimentInstance
 18from colosseum.mdp import BaseMDP
 19from colosseum.utils import ensure_folder
 20from colosseum.utils.miscellanea import get_agent_class_from_name
 21from colosseum.utils.miscellanea import get_mdp_class_from_name
 22
 23
 24def get_mdp_agent_gin_configs(
 25    experiment_folder: str,
 26) -> Tuple[
 27    Dict[Type["BaseMDP"], Set[str]], Dict[Type["BaseAgent"], Set[str]], List[str]
 28]:
 29    """
 30    Returns
 31    -------
 32    Dict[Type["BaseMDP"], Set[str]]
 33        The dictionary that associated to each MDP class the set of gin configuration indices found in the experiment
 34        folder.
 35    Dict[Type["BaseAgent"], Set[str]]
 36        The dictionary that associated to each agent class the set of gin configuration indices found in the experiment
 37        folder.
 38    List[str]
 39        The gin configuration file paths found in the experiment_folder.
 40    """
 41    gin_config_files_paths = []
 42
 43    mdp_classes_scopes = dict()
 44    for mdp_config_file in glob(
 45        f"{ensure_folder(experiment_folder)}mdp_configs{os.sep}*"
 46    ):
 47        with open(mdp_config_file, "r") as f:
 48            f_ = f.read()
 49        mdp_scopes = set(re.findall(r"prms_\d+", f_))
 50        mdp_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0]
 51        mdp_class = get_mdp_class_from_name(mdp_class_name)
 52        mdp_classes_scopes[mdp_class] = mdp_scopes
 53        gin_config_files_paths.append(mdp_config_file)
 54
 55    agent_classes_scopes = dict()
 56    for agent_config_file in glob(
 57        f"{ensure_folder(experiment_folder)}agents_configs{os.sep}*"
 58    ):
 59        with open(agent_config_file, "r") as f:
 60            f_ = f.read()
 61        agent_scopes = set(re.findall(r"prms_\d+", f_))
 62        agent_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0]
 63        agent_class = get_agent_class_from_name(agent_class_name)
 64        agent_classes_scopes[agent_class] = agent_scopes
 65        gin_config_files_paths.append(agent_config_file)
 66
 67    classes = list(mdp_classes_scopes.keys()) + list(agent_classes_scopes.keys())
 68    assert sum([c.is_episodic() for c in classes]) in [0, len(classes)], (
 69        f"Episodic and infinite horizon agents and/or MDP instances should not be mixed."
 70        f"Please check the configuration files of {experiment_folder}."
 71    )
 72
 73    return mdp_classes_scopes, agent_classes_scopes, gin_config_files_paths
 74
 75
 76def _get_experiment_mdp_agent_couples(
 77    experiment_config: ExperimentConfig,
 78    experiment_cur_folder: str,
 79    mdp_classes_scopes: Dict[Type["BaseMDP"], Set[str]],
 80    agent_classes_scopes: Dict[Type["BaseAgent"], Set[str]],
 81    gin_config_files_paths: List[str],
 82) -> List[ExperimentInstance]:
 83    experiment_mdp_agent_couples = []
 84    for seed in range(experiment_config.n_seeds):
 85        for mdp_class, mdp_scopes in mdp_classes_scopes.items():
 86            for mdp_scope in mdp_scopes:
 87                for (
 88                    agent_class,
 89                    agent_scopes,
 90                ) in agent_classes_scopes.items():
 91                    for agent_scope in agent_scopes:
 92                        exp_inst = ExperimentInstance(
 93                            seed,
 94                            mdp_class,
 95                            mdp_scope,
 96                            agent_class,
 97                            agent_scope,
 98                            experiment_cur_folder,
 99                            gin_config_files_paths,
100                            experiment_config,
101                        )
102                        if not exp_inst.does_log_file_exists:
103                            experiment_mdp_agent_couples.append(exp_inst)
104    return experiment_mdp_agent_couples
105
106
107def get_experiment_config(experiment_folder: str) -> ExperimentConfig:
108    """
109    Returns
110    -------
111    ExperimentConfig
112        The `ExperimentConfig` corresponding to the experiment folder.
113    """
114
115    config_file = ensure_folder(experiment_folder) + "experiment_config.yml"
116    with open(config_file, "r") as f:
117        experiment_config = yaml.load(f, yaml.Loader)
118    return ExperimentConfig(
119        n_seeds=experiment_config["n_seeds"],
120        n_steps=experiment_config["n_steps"],
121        max_interaction_time_s=experiment_config["max_interaction_time_s"],
122        log_performance_indicators_every=experiment_config[
123            "log_performance_indicators_every"
124        ],
125        emission_map=get_emission_map_from_name(
126            experiment_config["emission_map"]
127            if "emission_map" in experiment_config
128            else "Tabular"
129        ),
130    )
131
132
133def _clean_time_exceeded_records(log_file: str):
134    """
135    checks if the log file has been classified as an experiment that exceeded the time limit and, if so, it cleans the
136    record.
137    """
138    time_exceeded_experiment_record = (
139        log_file[: log_file.rfind(os.sep)] + os.sep + "time_exceeded.txt"
140    )
141    if os.path.exists(time_exceeded_experiment_record):
142        with open(time_exceeded_experiment_record, "r") as ff:
143            te = ff.readlines()
144        for tee in te:
145            if log_file in tee:
146                te.remove(tee)
147                break
148        if len(te) > 0:
149            with open(time_exceeded_experiment_record, "w") as ff:
150                ff.write("".join(te))
151        else:
152            os.remove(time_exceeded_experiment_record)
153
154
155def remove_corrupted_log_files(
156    experiment_folder: str,
157    experiment_config: ExperimentConfig = None,
158) -> List[str]:
159    """
160    checks if there are any inconsistencies in the log files of an experiment and removes them.
161    """
162
163    assert experiment_config is not None or os.path.isfile(
164        ensure_folder(experiment_folder) + "experiment_config.yml"
165    )
166    if not os.path.isdir(ensure_folder(experiment_folder) + "logs"):
167        return
168
169    if experiment_config is None:
170        with open(ensure_folder(experiment_folder) + "experiment_config.yml", "r") as f:
171            experiment_config = ExperimentConfig(**yaml.load(f, yaml.Loader))
172
173    file_paths = glob(f"{experiment_folder}{os.sep}**{os.sep}*.csv", recursive=True)
174    if config.VERBOSE_LEVEL != 0:
175        file_paths = tqdm(file_paths, desc="Checking for corrupted log files")
176
177    corrupted_files = []
178    for f in file_paths:
179        with open(f, "r") as ff:
180            len_f = len(ff.readlines())
181        logged_steps = [] if len_f <= 1 else pd.read_csv(f).steps.tolist()
182        if (
183            len_f <= 1
184            or
185            # checks whether there are any inconsistencies in the order of the logs
186            any(np.diff(pd.read_csv(f).steps) < 0)
187            # checks that all the steps have been logged
188            or not (
189                all(
190                    t in logged_steps
191                    for t in range(1, experiment_config.n_steps)
192                    if t % experiment_config.log_performance_indicators_every == 0
193                )
194                and (experiment_config.n_steps - 1) in logged_steps
195            )
196        ):
197            # If it was registered that this instance failed due to the time constrain, we remove that record since we
198            # are going to run this agent/MDP interaction from scratch.
199            _clean_time_exceeded_records(f)
200
201            # Moving the file to the temporary file folder just in case we want to double-check the formatting error.
202            shutil.move(
203                f,
204                gettempdir()
205                + f"{os.sep}_{len(corrupted_files)}_"
206                + f[f.rfind(os.sep) + 1 :],
207            )
208            corrupted_files.append(f)
209            tqdm.write(
210                f"The file {f} has been moved to tmp as it has some formatting errors."
211            )
212
213    if config.VERBOSE_LEVEL != 0:
214        print(corrupted_files)
215
216    return corrupted_files
def get_mdp_agent_gin_configs( experiment_folder: str) -> Tuple[Dict[Type[colosseum.mdp.base.BaseMDP], Set[str]], Dict[Type[colosseum.agent.agents.base.BaseAgent], Set[str]], List[str]]:
25def get_mdp_agent_gin_configs(
26    experiment_folder: str,
27) -> Tuple[
28    Dict[Type["BaseMDP"], Set[str]], Dict[Type["BaseAgent"], Set[str]], List[str]
29]:
30    """
31    Returns
32    -------
33    Dict[Type["BaseMDP"], Set[str]]
34        The dictionary that associated to each MDP class the set of gin configuration indices found in the experiment
35        folder.
36    Dict[Type["BaseAgent"], Set[str]]
37        The dictionary that associated to each agent class the set of gin configuration indices found in the experiment
38        folder.
39    List[str]
40        The gin configuration file paths found in the experiment_folder.
41    """
42    gin_config_files_paths = []
43
44    mdp_classes_scopes = dict()
45    for mdp_config_file in glob(
46        f"{ensure_folder(experiment_folder)}mdp_configs{os.sep}*"
47    ):
48        with open(mdp_config_file, "r") as f:
49            f_ = f.read()
50        mdp_scopes = set(re.findall(r"prms_\d+", f_))
51        mdp_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0]
52        mdp_class = get_mdp_class_from_name(mdp_class_name)
53        mdp_classes_scopes[mdp_class] = mdp_scopes
54        gin_config_files_paths.append(mdp_config_file)
55
56    agent_classes_scopes = dict()
57    for agent_config_file in glob(
58        f"{ensure_folder(experiment_folder)}agents_configs{os.sep}*"
59    ):
60        with open(agent_config_file, "r") as f:
61            f_ = f.read()
62        agent_scopes = set(re.findall(r"prms_\d+", f_))
63        agent_class_name = re.findall(r"prms_\d+/(.*?)\.", f_)[0]
64        agent_class = get_agent_class_from_name(agent_class_name)
65        agent_classes_scopes[agent_class] = agent_scopes
66        gin_config_files_paths.append(agent_config_file)
67
68    classes = list(mdp_classes_scopes.keys()) + list(agent_classes_scopes.keys())
69    assert sum([c.is_episodic() for c in classes]) in [0, len(classes)], (
70        f"Episodic and infinite horizon agents and/or MDP instances should not be mixed."
71        f"Please check the configuration files of {experiment_folder}."
72    )
73
74    return mdp_classes_scopes, agent_classes_scopes, gin_config_files_paths
Returns
  • Dict[Type["BaseMDP"], Set[str]]: The dictionary that associated to each MDP class the set of gin configuration indices found in the experiment folder.
  • Dict[Type["BaseAgent"], Set[str]]: The dictionary that associated to each agent class the set of gin configuration indices found in the experiment folder.
  • List[str]: The gin configuration file paths found in the experiment_folder.
def get_experiment_config(experiment_folder: str) -> colosseum.experiment.config.ExperimentConfig:
108def get_experiment_config(experiment_folder: str) -> ExperimentConfig:
109    """
110    Returns
111    -------
112    ExperimentConfig
113        The `ExperimentConfig` corresponding to the experiment folder.
114    """
115
116    config_file = ensure_folder(experiment_folder) + "experiment_config.yml"
117    with open(config_file, "r") as f:
118        experiment_config = yaml.load(f, yaml.Loader)
119    return ExperimentConfig(
120        n_seeds=experiment_config["n_seeds"],
121        n_steps=experiment_config["n_steps"],
122        max_interaction_time_s=experiment_config["max_interaction_time_s"],
123        log_performance_indicators_every=experiment_config[
124            "log_performance_indicators_every"
125        ],
126        emission_map=get_emission_map_from_name(
127            experiment_config["emission_map"]
128            if "emission_map" in experiment_config
129            else "Tabular"
130        ),
131    )
Returns
  • ExperimentConfig: The ExperimentConfig corresponding to the experiment folder.
def remove_corrupted_log_files( experiment_folder: str, experiment_config: colosseum.experiment.config.ExperimentConfig = None) -> List[str]:
156def remove_corrupted_log_files(
157    experiment_folder: str,
158    experiment_config: ExperimentConfig = None,
159) -> List[str]:
160    """
161    checks if there are any inconsistencies in the log files of an experiment and removes them.
162    """
163
164    assert experiment_config is not None or os.path.isfile(
165        ensure_folder(experiment_folder) + "experiment_config.yml"
166    )
167    if not os.path.isdir(ensure_folder(experiment_folder) + "logs"):
168        return
169
170    if experiment_config is None:
171        with open(ensure_folder(experiment_folder) + "experiment_config.yml", "r") as f:
172            experiment_config = ExperimentConfig(**yaml.load(f, yaml.Loader))
173
174    file_paths = glob(f"{experiment_folder}{os.sep}**{os.sep}*.csv", recursive=True)
175    if config.VERBOSE_LEVEL != 0:
176        file_paths = tqdm(file_paths, desc="Checking for corrupted log files")
177
178    corrupted_files = []
179    for f in file_paths:
180        with open(f, "r") as ff:
181            len_f = len(ff.readlines())
182        logged_steps = [] if len_f <= 1 else pd.read_csv(f).steps.tolist()
183        if (
184            len_f <= 1
185            or
186            # checks whether there are any inconsistencies in the order of the logs
187            any(np.diff(pd.read_csv(f).steps) < 0)
188            # checks that all the steps have been logged
189            or not (
190                all(
191                    t in logged_steps
192                    for t in range(1, experiment_config.n_steps)
193                    if t % experiment_config.log_performance_indicators_every == 0
194                )
195                and (experiment_config.n_steps - 1) in logged_steps
196            )
197        ):
198            # If it was registered that this instance failed due to the time constrain, we remove that record since we
199            # are going to run this agent/MDP interaction from scratch.
200            _clean_time_exceeded_records(f)
201
202            # Moving the file to the temporary file folder just in case we want to double-check the formatting error.
203            shutil.move(
204                f,
205                gettempdir()
206                + f"{os.sep}_{len(corrupted_files)}_"
207                + f[f.rfind(os.sep) + 1 :],
208            )
209            corrupted_files.append(f)
210            tqdm.write(
211                f"The file {f} has been moved to tmp as it has some formatting errors."
212            )
213
214    if config.VERBOSE_LEVEL != 0:
215        print(corrupted_files)
216
217    return corrupted_files

checks if there are any inconsistencies in the log files of an experiment and removes them.