import threading
import time
from typing import List, Dict, Optional, Any, TextIO
from collections import namedtuple
import sys
from .enums import LogLevel
from . import conf
# A structured representation of a single log entry.
LogRecord = namedtuple('LogRecord', ['timestamp', 'thread_name', 'component_id', 'event', 'details', 'level'])
[docs]
class Tracer:
"""
A thread-safe, centralized logging utility for the simulation engine.
It buffers log records and periodically flushes them to the console and/or
a file in a formatted, column-aligned manner.
"""
_log_buffer: List[LogRecord] = []
_lock = threading.Lock()
_thread: Optional[threading.Thread] = None
_stop_event = threading.Event()
_level: LogLevel = LogLevel.INFO
_flush_interval: float = 1.0
_output_file: Optional[str] = None
_log_to_console: bool = True
_file_handle: Optional[TextIO] = None
_error_file: Optional[str] = None
_error_file_handle: Optional[TextIO] = None
[docs]
@classmethod
def start(cls, level: LogLevel, flush_interval_seconds: float, output_file: Optional[str], log_to_console: bool = True, error_file: Optional[str] = "errors.log"):
"""
Initializes and starts the tracer thread.
Args:
level: The minimum log level to record.
flush_interval_seconds: How often to flush logs.
output_file: An optional file path to write logs to.
log_to_console: If True, logs will also be printed to the console.
error_file: An optional file path to write detailed exception
tracebacks to.
"""
with cls._lock:
if cls._thread is not None:
print(conf.TRACER_ERROR_ALREADY_STARTED, file=sys.stderr)
return
cls._level = level
cls._flush_interval = flush_interval_seconds
cls._output_file = output_file
cls._log_to_console = log_to_console
cls._error_file = error_file
if cls._output_file:
# Clear the file on start
cls._file_handle = open(cls._output_file, 'w', encoding='utf-8')
if cls._error_file:
cls._error_file_handle = open(cls._error_file, 'w', encoding='utf-8')
cls._stop_event.clear()
cls._thread = threading.Thread(target=cls._run, daemon=True, name=conf.TRACER_THREAD_NAME)
cls._thread.start()
[docs]
@classmethod
def stop(cls):
"""Stops the tracer thread and performs a final flush of all logs."""
thread_to_join = None
with cls._lock:
if cls._thread is None:
return
cls._stop_event.set()
thread_to_join = cls._thread
cls._thread = None
if thread_to_join:
thread_to_join.join()
# Final flush after the thread has stopped
cls._flush_logs()
if cls._file_handle:
cls._file_handle.close()
cls._file_handle = None
if cls._error_file_handle:
cls._error_file_handle.close()
cls._error_file_handle = None
[docs]
@classmethod
def log(cls, level: LogLevel, component_id: str, event: str, details: Optional[Dict[str, Any]] = None):
"""
Adds a log record to the buffer.
This method is thread-safe.
Args:
level: The severity level of the log.
component_id: The identifier of the component logging the event.
event: The name of the event being logged.
details: An optional dictionary of structured data.
"""
if cls._thread is None or level.value < cls._level.value:
return
record = LogRecord(
timestamp=time.time(),
thread_name=threading.current_thread().name or conf.THREAD_UNKNOWN_NAME,
component_id=component_id,
event=event,
details=details or {},
level=level
)
with cls._lock:
cls._log_buffer.append(record)
@classmethod
def _run(cls):
"""The main loop for the tracer thread, flushing logs periodically."""
while not cls._stop_event.wait(cls._flush_interval):
cls._flush_logs()
@classmethod
def _flush_logs(cls):
"""
Formats and writes all buffered logs to the configured outputs.
This method calculates column widths dynamically for alignment.
"""
logs_to_flush: List[LogRecord] = []
with cls._lock:
if not cls._log_buffer:
return
cls._log_buffer.sort(key=lambda r: r.timestamp)
logs_to_flush = cls._log_buffer
cls._log_buffer = []
if not logs_to_flush:
return
max_thread_width = max(len(r.thread_name) for r in logs_to_flush)
max_id_width = max(len(r.component_id) for r in logs_to_flush)
max_event_width = max(len(r.event) for r in logs_to_flush)
for record in logs_to_flush:
is_exception = record.level == LogLevel.ERROR and record.event == conf.LOG_EVENT_EXCEPTION and 'traceback' in record.details
if is_exception and cls._error_file_handle:
# Write full traceback to error log
error_header = f"--- Exception at {record.timestamp} in {record.component_id} ({record.thread_name}) ---\n"
cls._error_file_handle.write(error_header)
cls._error_file_handle.write(record.details['traceback'] + '\n\n')
cls._error_file_handle.flush()
# Modify details for the main log to be less verbose
main_log_details = record.details.copy()
main_log_details['traceback'] = conf.LOG_DETAIL_TRACEBACK_REDIRECT.format(cls._error_file)
else:
main_log_details = record.details
time_sec_str = time.strftime(conf.TRACER_TIMESTAMP_FORMAT, time.localtime(record.timestamp))
milliseconds = int((record.timestamp % 1) * 1000)
time_str = time_sec_str + conf.TRACER_MILLISECOND_FORMAT.format(milliseconds)
details_str = " | ".join([f"{k}={v}" for k, v in main_log_details.items()])
thread_str = record.thread_name.ljust(max_thread_width)
id_str = record.component_id.ljust(max_id_width)
event_str = record.event.ljust(max_event_width)
log_line = f"[{time_str}] [{thread_str}] [{id_str}] [{event_str}] {details_str}".rstrip()
if cls._log_to_console:
print(log_line)
if cls._file_handle:
cls._file_handle.write(log_line + '\n')
if cls._file_handle:
cls._file_handle.flush()