colosseum.utils.acme.csv_logger

  1import csv
  2import os
  3import time
  4from typing import Sequence, TextIO, Union
  5
  6import numpy as np
  7import pandas as pd
  8import toolz
  9from absl import logging
 10
 11from colosseum.utils.acme.base_logger import Logger, LoggingData
 12from colosseum.utils.acme.path import process_path
 13
 14
 15class CSVLogger(Logger):
 16    """
 17    A custom Logger class inspired by acme Logger.
 18    """
 19
 20    _open = open
 21
 22    @property
 23    def data(self) -> Sequence[LoggingData]:
 24        try:
 25            self.flush()
 26        except:
 27            pass
 28        data = pd.read_csv(self._file.name).to_dict()
 29        return toolz.valmap(lambda x: list(x.values()), data)
 30
 31    def __init__(
 32        self,
 33        directory_or_file: Union[str, TextIO] = "tmp",
 34        label: str = "",
 35        time_delta: float = 0.0,
 36        add_uid: bool = True,
 37        flush_every: int = 30,
 38        file_name: str = "logs",
 39    ):
 40        os.makedirs(directory_or_file, exist_ok=True)
 41
 42        self._label = label
 43        self._directory_or_file = directory_or_file
 44        self._file_name = file_name
 45        self._time_delta = time_delta
 46        self._flush_every = flush_every
 47        self._add_uid = add_uid
 48
 49        self.reset()
 50
 51        if flush_every <= 0:
 52            raise ValueError(
 53                f"`flush_every` must be a positive integer (got {flush_every})."
 54            )
 55
 56    def _create_file(
 57        self,
 58        directory_or_file: Union[str, TextIO],
 59        label: str,
 60    ) -> TextIO:
 61        """Opens a file if input is a directory or use existing file."""
 62        if isinstance(directory_or_file, str):
 63            self._directory = process_path(
 64                directory_or_file, "logs", label, add_uid=self._add_uid
 65            )
 66            file_path = os.path.join(self._directory, f"{self._file_name}.csv")
 67            self._file_owner = True
 68            return self._open(file_path, mode="w")
 69
 70        # TextIO instance.
 71        file = directory_or_file
 72        if label:
 73            logging.info("File, not directory, passed to CSVLogger; label not used.")
 74        if not file.mode.startswith("a"):
 75            raise ValueError(
 76                "File must be open in append mode; instead got " f'mode="{file.mode}".'
 77            )
 78        return file
 79
 80    def write(self, data: LoggingData):
 81        """Writes a `data` into a row of comma-separated values."""
 82        # Only log if `time_delta` seconds have passed since last logging event.
 83        now = time.time()
 84
 85        elapsed = now - self._last_log_time
 86        if elapsed < self._time_delta:
 87            logging.debug(
 88                "Not due to log for another %.2f seconds, dropping data.",
 89                self._time_delta - elapsed,
 90            )
 91            return
 92        self._last_log_time = now
 93
 94        # Append row to CSV.
 95        data = toolz.valmap(np.array, data)
 96        # Use fields from initial `data` to create the header. If extra fields are
 97        # present in subsequent `data`, we ignore them.
 98        if not self._writer:
 99            fields = sorted(data.keys())
100            self._writer = csv.DictWriter(
101                self._file, fieldnames=fields, extrasaction="ignore"
102            )
103            # Write header only if the file is empty.
104            if not self._file.tell():
105                self._writer.writeheader()
106        self._writer.writerow(data)
107
108        # Flush every `flush_every` writes.
109        if self._writes % self._flush_every == 0:
110            self.flush()
111        self._writes += 1
112
113    def close(self):
114        self.flush()
115        if self._file_owner:
116            self._file.close()
117
118    def flush(self):
119        self._file.flush()
120
121    def reset(self) -> None:
122        self._last_log_time = time.time() - self._time_delta
123        self._writer = None
124        self._file_owner = False
125        self._file = self._create_file(self._directory_or_file, self._label)
126        self._writes = 0
127        logging.info("Logging to %s", self.file_path)
128
129    @property
130    def file_path(self) -> str:
131        return self._file.name
class CSVLogger(colosseum.utils.acme.base_logger.Logger):
 16class CSVLogger(Logger):
 17    """
 18    A custom Logger class inspired by acme Logger.
 19    """
 20
 21    _open = open
 22
 23    @property
 24    def data(self) -> Sequence[LoggingData]:
 25        try:
 26            self.flush()
 27        except:
 28            pass
 29        data = pd.read_csv(self._file.name).to_dict()
 30        return toolz.valmap(lambda x: list(x.values()), data)
 31
 32    def __init__(
 33        self,
 34        directory_or_file: Union[str, TextIO] = "tmp",
 35        label: str = "",
 36        time_delta: float = 0.0,
 37        add_uid: bool = True,
 38        flush_every: int = 30,
 39        file_name: str = "logs",
 40    ):
 41        os.makedirs(directory_or_file, exist_ok=True)
 42
 43        self._label = label
 44        self._directory_or_file = directory_or_file
 45        self._file_name = file_name
 46        self._time_delta = time_delta
 47        self._flush_every = flush_every
 48        self._add_uid = add_uid
 49
 50        self.reset()
 51
 52        if flush_every <= 0:
 53            raise ValueError(
 54                f"`flush_every` must be a positive integer (got {flush_every})."
 55            )
 56
 57    def _create_file(
 58        self,
 59        directory_or_file: Union[str, TextIO],
 60        label: str,
 61    ) -> TextIO:
 62        """Opens a file if input is a directory or use existing file."""
 63        if isinstance(directory_or_file, str):
 64            self._directory = process_path(
 65                directory_or_file, "logs", label, add_uid=self._add_uid
 66            )
 67            file_path = os.path.join(self._directory, f"{self._file_name}.csv")
 68            self._file_owner = True
 69            return self._open(file_path, mode="w")
 70
 71        # TextIO instance.
 72        file = directory_or_file
 73        if label:
 74            logging.info("File, not directory, passed to CSVLogger; label not used.")
 75        if not file.mode.startswith("a"):
 76            raise ValueError(
 77                "File must be open in append mode; instead got " f'mode="{file.mode}".'
 78            )
 79        return file
 80
 81    def write(self, data: LoggingData):
 82        """Writes a `data` into a row of comma-separated values."""
 83        # Only log if `time_delta` seconds have passed since last logging event.
 84        now = time.time()
 85
 86        elapsed = now - self._last_log_time
 87        if elapsed < self._time_delta:
 88            logging.debug(
 89                "Not due to log for another %.2f seconds, dropping data.",
 90                self._time_delta - elapsed,
 91            )
 92            return
 93        self._last_log_time = now
 94
 95        # Append row to CSV.
 96        data = toolz.valmap(np.array, data)
 97        # Use fields from initial `data` to create the header. If extra fields are
 98        # present in subsequent `data`, we ignore them.
 99        if not self._writer:
100            fields = sorted(data.keys())
101            self._writer = csv.DictWriter(
102                self._file, fieldnames=fields, extrasaction="ignore"
103            )
104            # Write header only if the file is empty.
105            if not self._file.tell():
106                self._writer.writeheader()
107        self._writer.writerow(data)
108
109        # Flush every `flush_every` writes.
110        if self._writes % self._flush_every == 0:
111            self.flush()
112        self._writes += 1
113
114    def close(self):
115        self.flush()
116        if self._file_owner:
117            self._file.close()
118
119    def flush(self):
120        self._file.flush()
121
122    def reset(self) -> None:
123        self._last_log_time = time.time() - self._time_delta
124        self._writer = None
125        self._file_owner = False
126        self._file = self._create_file(self._directory_or_file, self._label)
127        self._writes = 0
128        logging.info("Logging to %s", self.file_path)
129
130    @property
131    def file_path(self) -> str:
132        return self._file.name

A custom Logger class inspired by acme Logger.

CSVLogger( directory_or_file: Union[str, TextIO] = 'tmp', label: str = '', time_delta: float = 0.0, add_uid: bool = True, flush_every: int = 30, file_name: str = 'logs')
32    def __init__(
33        self,
34        directory_or_file: Union[str, TextIO] = "tmp",
35        label: str = "",
36        time_delta: float = 0.0,
37        add_uid: bool = True,
38        flush_every: int = 30,
39        file_name: str = "logs",
40    ):
41        os.makedirs(directory_or_file, exist_ok=True)
42
43        self._label = label
44        self._directory_or_file = directory_or_file
45        self._file_name = file_name
46        self._time_delta = time_delta
47        self._flush_every = flush_every
48        self._add_uid = add_uid
49
50        self.reset()
51
52        if flush_every <= 0:
53            raise ValueError(
54                f"`flush_every` must be a positive integer (got {flush_every})."
55            )
def write(self, data: Mapping[str, Any]):
 81    def write(self, data: LoggingData):
 82        """Writes a `data` into a row of comma-separated values."""
 83        # Only log if `time_delta` seconds have passed since last logging event.
 84        now = time.time()
 85
 86        elapsed = now - self._last_log_time
 87        if elapsed < self._time_delta:
 88            logging.debug(
 89                "Not due to log for another %.2f seconds, dropping data.",
 90                self._time_delta - elapsed,
 91            )
 92            return
 93        self._last_log_time = now
 94
 95        # Append row to CSV.
 96        data = toolz.valmap(np.array, data)
 97        # Use fields from initial `data` to create the header. If extra fields are
 98        # present in subsequent `data`, we ignore them.
 99        if not self._writer:
100            fields = sorted(data.keys())
101            self._writer = csv.DictWriter(
102                self._file, fieldnames=fields, extrasaction="ignore"
103            )
104            # Write header only if the file is empty.
105            if not self._file.tell():
106                self._writer.writeheader()
107        self._writer.writerow(data)
108
109        # Flush every `flush_every` writes.
110        if self._writes % self._flush_every == 0:
111            self.flush()
112        self._writes += 1

Writes a data into a row of comma-separated values.

def close(self):
114    def close(self):
115        self.flush()
116        if self._file_owner:
117            self._file.close()

Closes the logger, not expecting any further write.

def flush(self):
119    def flush(self):
120        self._file.flush()
def reset(self) -> None:
122    def reset(self) -> None:
123        self._last_log_time = time.time() - self._time_delta
124        self._writer = None
125        self._file_owner = False
126        self._file = self._create_file(self._directory_or_file, self._label)
127        self._writes = 0
128        logging.info("Logging to %s", self.file_path)

Empties the logger.