colosseum.utils.acme.csv_logger
1import csv 2import os 3import time 4from typing import Sequence, TextIO, Union 5 6import numpy as np 7import pandas as pd 8import toolz 9from absl import logging 10 11from colosseum.utils.acme.base_logger import Logger, LoggingData 12from colosseum.utils.acme.path import process_path 13 14 15class CSVLogger(Logger): 16 """ 17 A custom Logger class inspired by acme Logger. 18 """ 19 20 _open = open 21 22 @property 23 def data(self) -> Sequence[LoggingData]: 24 try: 25 self.flush() 26 except: 27 pass 28 data = pd.read_csv(self._file.name).to_dict() 29 return toolz.valmap(lambda x: list(x.values()), data) 30 31 def __init__( 32 self, 33 directory_or_file: Union[str, TextIO] = "tmp", 34 label: str = "", 35 time_delta: float = 0.0, 36 add_uid: bool = True, 37 flush_every: int = 30, 38 file_name: str = "logs", 39 ): 40 os.makedirs(directory_or_file, exist_ok=True) 41 42 self._label = label 43 self._directory_or_file = directory_or_file 44 self._file_name = file_name 45 self._time_delta = time_delta 46 self._flush_every = flush_every 47 self._add_uid = add_uid 48 49 self.reset() 50 51 if flush_every <= 0: 52 raise ValueError( 53 f"`flush_every` must be a positive integer (got {flush_every})." 54 ) 55 56 def _create_file( 57 self, 58 directory_or_file: Union[str, TextIO], 59 label: str, 60 ) -> TextIO: 61 """Opens a file if input is a directory or use existing file.""" 62 if isinstance(directory_or_file, str): 63 self._directory = process_path( 64 directory_or_file, "logs", label, add_uid=self._add_uid 65 ) 66 file_path = os.path.join(self._directory, f"{self._file_name}.csv") 67 self._file_owner = True 68 return self._open(file_path, mode="w") 69 70 # TextIO instance. 71 file = directory_or_file 72 if label: 73 logging.info("File, not directory, passed to CSVLogger; label not used.") 74 if not file.mode.startswith("a"): 75 raise ValueError( 76 "File must be open in append mode; instead got " f'mode="{file.mode}".' 77 ) 78 return file 79 80 def write(self, data: LoggingData): 81 """Writes a `data` into a row of comma-separated values.""" 82 # Only log if `time_delta` seconds have passed since last logging event. 83 now = time.time() 84 85 elapsed = now - self._last_log_time 86 if elapsed < self._time_delta: 87 logging.debug( 88 "Not due to log for another %.2f seconds, dropping data.", 89 self._time_delta - elapsed, 90 ) 91 return 92 self._last_log_time = now 93 94 # Append row to CSV. 95 data = toolz.valmap(np.array, data) 96 # Use fields from initial `data` to create the header. If extra fields are 97 # present in subsequent `data`, we ignore them. 98 if not self._writer: 99 fields = sorted(data.keys()) 100 self._writer = csv.DictWriter( 101 self._file, fieldnames=fields, extrasaction="ignore" 102 ) 103 # Write header only if the file is empty. 104 if not self._file.tell(): 105 self._writer.writeheader() 106 self._writer.writerow(data) 107 108 # Flush every `flush_every` writes. 109 if self._writes % self._flush_every == 0: 110 self.flush() 111 self._writes += 1 112 113 def close(self): 114 self.flush() 115 if self._file_owner: 116 self._file.close() 117 118 def flush(self): 119 self._file.flush() 120 121 def reset(self) -> None: 122 self._last_log_time = time.time() - self._time_delta 123 self._writer = None 124 self._file_owner = False 125 self._file = self._create_file(self._directory_or_file, self._label) 126 self._writes = 0 127 logging.info("Logging to %s", self.file_path) 128 129 @property 130 def file_path(self) -> str: 131 return self._file.name
16class CSVLogger(Logger): 17 """ 18 A custom Logger class inspired by acme Logger. 19 """ 20 21 _open = open 22 23 @property 24 def data(self) -> Sequence[LoggingData]: 25 try: 26 self.flush() 27 except: 28 pass 29 data = pd.read_csv(self._file.name).to_dict() 30 return toolz.valmap(lambda x: list(x.values()), data) 31 32 def __init__( 33 self, 34 directory_or_file: Union[str, TextIO] = "tmp", 35 label: str = "", 36 time_delta: float = 0.0, 37 add_uid: bool = True, 38 flush_every: int = 30, 39 file_name: str = "logs", 40 ): 41 os.makedirs(directory_or_file, exist_ok=True) 42 43 self._label = label 44 self._directory_or_file = directory_or_file 45 self._file_name = file_name 46 self._time_delta = time_delta 47 self._flush_every = flush_every 48 self._add_uid = add_uid 49 50 self.reset() 51 52 if flush_every <= 0: 53 raise ValueError( 54 f"`flush_every` must be a positive integer (got {flush_every})." 55 ) 56 57 def _create_file( 58 self, 59 directory_or_file: Union[str, TextIO], 60 label: str, 61 ) -> TextIO: 62 """Opens a file if input is a directory or use existing file.""" 63 if isinstance(directory_or_file, str): 64 self._directory = process_path( 65 directory_or_file, "logs", label, add_uid=self._add_uid 66 ) 67 file_path = os.path.join(self._directory, f"{self._file_name}.csv") 68 self._file_owner = True 69 return self._open(file_path, mode="w") 70 71 # TextIO instance. 72 file = directory_or_file 73 if label: 74 logging.info("File, not directory, passed to CSVLogger; label not used.") 75 if not file.mode.startswith("a"): 76 raise ValueError( 77 "File must be open in append mode; instead got " f'mode="{file.mode}".' 78 ) 79 return file 80 81 def write(self, data: LoggingData): 82 """Writes a `data` into a row of comma-separated values.""" 83 # Only log if `time_delta` seconds have passed since last logging event. 84 now = time.time() 85 86 elapsed = now - self._last_log_time 87 if elapsed < self._time_delta: 88 logging.debug( 89 "Not due to log for another %.2f seconds, dropping data.", 90 self._time_delta - elapsed, 91 ) 92 return 93 self._last_log_time = now 94 95 # Append row to CSV. 96 data = toolz.valmap(np.array, data) 97 # Use fields from initial `data` to create the header. If extra fields are 98 # present in subsequent `data`, we ignore them. 99 if not self._writer: 100 fields = sorted(data.keys()) 101 self._writer = csv.DictWriter( 102 self._file, fieldnames=fields, extrasaction="ignore" 103 ) 104 # Write header only if the file is empty. 105 if not self._file.tell(): 106 self._writer.writeheader() 107 self._writer.writerow(data) 108 109 # Flush every `flush_every` writes. 110 if self._writes % self._flush_every == 0: 111 self.flush() 112 self._writes += 1 113 114 def close(self): 115 self.flush() 116 if self._file_owner: 117 self._file.close() 118 119 def flush(self): 120 self._file.flush() 121 122 def reset(self) -> None: 123 self._last_log_time = time.time() - self._time_delta 124 self._writer = None 125 self._file_owner = False 126 self._file = self._create_file(self._directory_or_file, self._label) 127 self._writes = 0 128 logging.info("Logging to %s", self.file_path) 129 130 @property 131 def file_path(self) -> str: 132 return self._file.name
A custom Logger class inspired by acme Logger.
CSVLogger( directory_or_file: Union[str, TextIO] = 'tmp', label: str = '', time_delta: float = 0.0, add_uid: bool = True, flush_every: int = 30, file_name: str = 'logs')
32 def __init__( 33 self, 34 directory_or_file: Union[str, TextIO] = "tmp", 35 label: str = "", 36 time_delta: float = 0.0, 37 add_uid: bool = True, 38 flush_every: int = 30, 39 file_name: str = "logs", 40 ): 41 os.makedirs(directory_or_file, exist_ok=True) 42 43 self._label = label 44 self._directory_or_file = directory_or_file 45 self._file_name = file_name 46 self._time_delta = time_delta 47 self._flush_every = flush_every 48 self._add_uid = add_uid 49 50 self.reset() 51 52 if flush_every <= 0: 53 raise ValueError( 54 f"`flush_every` must be a positive integer (got {flush_every})." 55 )
def
write(self, data: Mapping[str, Any]):
81 def write(self, data: LoggingData): 82 """Writes a `data` into a row of comma-separated values.""" 83 # Only log if `time_delta` seconds have passed since last logging event. 84 now = time.time() 85 86 elapsed = now - self._last_log_time 87 if elapsed < self._time_delta: 88 logging.debug( 89 "Not due to log for another %.2f seconds, dropping data.", 90 self._time_delta - elapsed, 91 ) 92 return 93 self._last_log_time = now 94 95 # Append row to CSV. 96 data = toolz.valmap(np.array, data) 97 # Use fields from initial `data` to create the header. If extra fields are 98 # present in subsequent `data`, we ignore them. 99 if not self._writer: 100 fields = sorted(data.keys()) 101 self._writer = csv.DictWriter( 102 self._file, fieldnames=fields, extrasaction="ignore" 103 ) 104 # Write header only if the file is empty. 105 if not self._file.tell(): 106 self._writer.writeheader() 107 self._writer.writerow(data) 108 109 # Flush every `flush_every` writes. 110 if self._writes % self._flush_every == 0: 111 self.flush() 112 self._writes += 1
Writes a data
into a row of comma-separated values.
def
reset(self) -> None:
122 def reset(self) -> None: 123 self._last_log_time = time.time() - self._time_delta 124 self._writer = None 125 self._file_owner = False 126 self._file = self._create_file(self._directory_or_file, self._label) 127 self._writes = 0 128 logging.info("Logging to %s", self.file_path)
Empties the logger.