Source code for ml.tracer

import threading
import time
from typing import List, Dict, Optional, Any, TextIO
from collections import namedtuple
import sys
from .enums import LogLevel
from . import conf

# A structured representation of a single log entry.
LogRecord = namedtuple('LogRecord', ['timestamp', 'thread_name', 'component_id', 'event', 'details', 'level'])

[docs] class Tracer: """ A thread-safe, centralized logging utility for the simulation engine. It buffers log records and periodically flushes them to the console and/or a file in a formatted, column-aligned manner. """ _log_buffer: List[LogRecord] = [] _lock = threading.Lock() _thread: Optional[threading.Thread] = None _stop_event = threading.Event() _level: LogLevel = LogLevel.INFO _flush_interval: float = 1.0 _output_file: Optional[str] = None _log_to_console: bool = True _file_handle: Optional[TextIO] = None _error_file: Optional[str] = None _error_file_handle: Optional[TextIO] = None
[docs] @classmethod def start(cls, level: LogLevel, flush_interval_seconds: float, output_file: Optional[str], log_to_console: bool = True, error_file: Optional[str] = "errors.log"): """ Initializes and starts the tracer thread. Args: level: The minimum log level to record. flush_interval_seconds: How often to flush logs. output_file: An optional file path to write logs to. log_to_console: If True, logs will also be printed to the console. error_file: An optional file path to write detailed exception tracebacks to. """ with cls._lock: if cls._thread is not None: print(conf.TRACER_ERROR_ALREADY_STARTED, file=sys.stderr) return cls._level = level cls._flush_interval = flush_interval_seconds cls._output_file = output_file cls._log_to_console = log_to_console cls._error_file = error_file if cls._output_file: # Clear the file on start cls._file_handle = open(cls._output_file, 'w', encoding='utf-8') if cls._error_file: cls._error_file_handle = open(cls._error_file, 'w', encoding='utf-8') cls._stop_event.clear() cls._thread = threading.Thread(target=cls._run, daemon=True, name=conf.TRACER_THREAD_NAME) cls._thread.start()
[docs] @classmethod def stop(cls): """Stops the tracer thread and performs a final flush of all logs.""" thread_to_join = None with cls._lock: if cls._thread is None: return cls._stop_event.set() thread_to_join = cls._thread cls._thread = None if thread_to_join: thread_to_join.join() # Final flush after the thread has stopped cls._flush_logs() if cls._file_handle: cls._file_handle.close() cls._file_handle = None if cls._error_file_handle: cls._error_file_handle.close() cls._error_file_handle = None
[docs] @classmethod def log(cls, level: LogLevel, component_id: str, event: str, details: Optional[Dict[str, Any]] = None): """ Adds a log record to the buffer. This method is thread-safe. Args: level: The severity level of the log. component_id: The identifier of the component logging the event. event: The name of the event being logged. details: An optional dictionary of structured data. """ if cls._thread is None or level.value < cls._level.value: return record = LogRecord( timestamp=time.time(), thread_name=threading.current_thread().name or conf.THREAD_UNKNOWN_NAME, component_id=component_id, event=event, details=details or {}, level=level ) with cls._lock: cls._log_buffer.append(record)
@classmethod def _run(cls): """The main loop for the tracer thread, flushing logs periodically.""" while not cls._stop_event.wait(cls._flush_interval): cls._flush_logs() @classmethod def _flush_logs(cls): """ Formats and writes all buffered logs to the configured outputs. This method calculates column widths dynamically for alignment. """ logs_to_flush: List[LogRecord] = [] with cls._lock: if not cls._log_buffer: return cls._log_buffer.sort(key=lambda r: r.timestamp) logs_to_flush = cls._log_buffer cls._log_buffer = [] if not logs_to_flush: return max_thread_width = max(len(r.thread_name) for r in logs_to_flush) max_id_width = max(len(r.component_id) for r in logs_to_flush) max_event_width = max(len(r.event) for r in logs_to_flush) for record in logs_to_flush: is_exception = record.level == LogLevel.ERROR and record.event == conf.LOG_EVENT_EXCEPTION and 'traceback' in record.details if is_exception and cls._error_file_handle: # Write full traceback to error log error_header = f"--- Exception at {record.timestamp} in {record.component_id} ({record.thread_name}) ---\n" cls._error_file_handle.write(error_header) cls._error_file_handle.write(record.details['traceback'] + '\n\n') cls._error_file_handle.flush() # Modify details for the main log to be less verbose main_log_details = record.details.copy() main_log_details['traceback'] = conf.LOG_DETAIL_TRACEBACK_REDIRECT.format(cls._error_file) else: main_log_details = record.details time_sec_str = time.strftime(conf.TRACER_TIMESTAMP_FORMAT, time.localtime(record.timestamp)) milliseconds = int((record.timestamp % 1) * 1000) time_str = time_sec_str + conf.TRACER_MILLISECOND_FORMAT.format(milliseconds) details_str = " | ".join([f"{k}={v}" for k, v in main_log_details.items()]) thread_str = record.thread_name.ljust(max_thread_width) id_str = record.component_id.ljust(max_id_width) event_str = record.event.ljust(max_event_width) log_line = f"[{time_str}] [{thread_str}] [{id_str}] [{event_str}] {details_str}".rstrip() if cls._log_to_console: print(log_line) if cls._file_handle: cls._file_handle.write(log_line + '\n') if cls._file_handle: cls._file_handle.flush()