Source code for ml.tracer

import threading
import time
from typing import List, Dict, Optional, Any, TextIO
from collections import namedtuple, defaultdict
import sys
import builtins
import re
from .enums import LogLevel
import json
from datetime import datetime
from . import conf as ml_conf

# A structured representation of a single log entry.
LogRecord = namedtuple('LogRecord', ['timestamp', 'thread_name', 'component_id', 'event', 'details', 'level'])

[docs] class Tracer: """ A thread-safe, centralized logging utility for the simulation engine. It buffers log records and periodically flushes them to the console and/or a file in a formatted, column-aligned manner. """ _log_buffer: List[LogRecord] = [] _lock = threading.Lock() _thread: Optional[threading.Thread] = None _active_events: Dict[int, Dict] = {} # Tracks active events by their unique ID _event_counter: int = 0 _stop_event = threading.Event() _level: LogLevel = LogLevel.INFO _flush_interval: float = 1.0 _output_file: Optional[str] = None _log_to_console: bool = True _file_handle: Optional[TextIO] = None _error_file: Optional[str] = None _error_file_handle: Optional[TextIO] = None
[docs] @classmethod def start(cls, level: LogLevel, flush_interval_seconds: float, output_file: Optional[str], log_to_console: bool = True, error_file: Optional[str] = "errors.log"): """ Initializes and starts the tracer thread. Args: level: The minimum log level to record. flush_interval_seconds: How often to flush logs. output_file: An optional file path to write logs to. log_to_console: If True, logs will also be printed to the console. error_file: An optional file path to write detailed exception tracebacks to. """ with cls._lock: if cls._thread is not None: print(ml_conf.TRACER_ERROR_ALREADY_STARTED, file=sys.stderr) return cls._level = level cls._flush_interval = flush_interval_seconds cls._output_file = output_file cls._log_to_console = log_to_console cls._error_file = error_file if cls._output_file: # Clear the file on start cls._file_handle = open(cls._output_file, 'w', encoding='utf-8') if cls._error_file: cls._error_file_handle = open(cls._error_file, 'w', encoding='utf-8') cls._stop_event.clear() cls._thread = threading.Thread(target=cls._run, daemon=True, name=ml_conf.TRACER_THREAD_NAME) cls._thread.start()
[docs] @classmethod def stop(cls): """Stops the tracer thread and performs a final flush of all logs.""" thread_to_join = None with cls._lock: if cls._thread is None: return cls._stop_event.set() thread_to_join = cls._thread cls._thread = None if thread_to_join: thread_to_join.join() # Final flush after the thread has stopped cls._flush_logs() if cls._file_handle: cls._file_handle.close() cls._file_handle = None if cls._error_file_handle: cls._error_file_handle.close() cls._error_file_handle = None
[docs] @classmethod def log(cls, level: LogLevel, component_id: str, event: str, details: Optional[Dict[str, Any]] = None, tick: Optional[int] = None): """ Adds a log record to the buffer. This method is thread-safe. Args: level: The severity level of the log. component_id: The identifier of the component logging the event. event: The name of the event being logged. details: An optional dictionary of structured data. If a string is passed, it will be wrapped in a 'message' key. tick: The optional simulation tick number. """ if cls._thread is None or level.value < cls._level.value: return None # No log entry created, so return None timestamp = time.time() thread_name = threading.current_thread().name or ml_conf.THREAD_UNKNOWN_NAME # Ensure details is a mutable dictionary log_details = details.copy() if isinstance(details, dict) else {ml_conf.LOG_DETAIL_KEY_MESSAGE: details} if details else {} if tick is not None: log_details[ml_conf.LOG_DETAIL_KEY_TICK] = tick # Assign a unique event_id for START events if event.endswith('_START'): with cls._lock: cls._event_counter += 1 event_id = cls._event_counter log_details[ml_conf.TRACER_KEY_EVENT_ID] = event_id # Add event_id to the details dictionary # Store the start event details for later lookup on _END cls._active_events[event_id] = {ml_conf.LOG_DETAIL_KEY_TICK: tick} elif event.endswith('_END'): event_id = log_details.get(ml_conf.TRACER_KEY_EVENT_ID) if event_id in cls._active_events: start_event_details = cls._active_events.pop(event_id) # The tick from the start event becomes the start_tick log_details[ml_conf.LOG_DETAIL_KEY_START_TICK] = start_event_details.get(ml_conf.LOG_DETAIL_KEY_TICK) # The tick from this _END call becomes the end_tick log_details[ml_conf.LOG_DETAIL_KEY_END_TICK] = tick # Create the LogRecord for internal buffering record = LogRecord(timestamp, thread_name, component_id, event, log_details, level) with cls._lock: cls._log_buffer.append(record) # Return a dictionary representation of the log entry for external use (e.g., parent_event_id) # This ensures the caller gets the full details, including the event_id if it's a START event. return {ml_conf.TRACER_KEY_TIMESTAMP: timestamp, ml_conf.TRACER_KEY_THREAD_NAME: thread_name, ml_conf.TRACER_KEY_COMPONENT_ID: component_id, ml_conf.TRACER_KEY_ACTION: event, ml_conf.TRACER_KEY_DETAILS: log_details }
@classmethod def _run(cls): """The main loop for the tracer thread, flushing logs periodically.""" while not cls._stop_event.wait(cls._flush_interval): cls._flush_logs() @classmethod def _flush_logs(cls): """ Formats and writes all buffered logs to the configured outputs. This method calculates column widths dynamically for alignment. """ logs_to_flush: List[LogRecord] = [] with cls._lock: if not cls._log_buffer: return cls._log_buffer.sort(key=lambda r: r.timestamp) logs_to_flush = cls._log_buffer cls._log_buffer = [] if not logs_to_flush: return max_thread_width = max(len(r.thread_name) for r in logs_to_flush) max_id_width = max(len(r.component_id) for r in logs_to_flush) max_event_width = max(len(r.event) for r in logs_to_flush) for record in logs_to_flush: is_exception = record.level == LogLevel.ERROR and record.event == ml_conf.LOG_EVENT_EXCEPTION and ml_conf.LOG_DETAIL_KEY_TRACEBACK in record.details if is_exception and cls._error_file_handle: # Write full traceback to error log error_header = f"--- Exception at {record.timestamp} in {record.component_id} ({record.thread_name}) ---\n" cls._error_file_handle.write(error_header) cls._error_file_handle.write(record.details[ml_conf.LOG_DETAIL_KEY_TRACEBACK] + '\n\n') cls._error_file_handle.flush() # Modify details for the main log to be less verbose main_log_details = record.details.copy() main_log_details[ml_conf.LOG_DETAIL_KEY_TRACEBACK] = ml_conf.LOG_DETAIL_TRACEBACK_REDIRECT.format(cls._error_file) else: main_log_details = record.details dt_object = datetime.fromtimestamp(record.timestamp) time_str = dt_object.strftime(ml_conf.TRACER_TIMESTAMP_FORMAT) if isinstance(main_log_details, dict): # Always write the full details to the log file for parsing. details_str = " | ".join([f"{k}={v}" for k, v in main_log_details.items()]) else: details_str = str(main_log_details) thread_str = record.thread_name.ljust(max_thread_width) id_str = record.component_id.ljust(max_id_width) event_str = record.event.ljust(max_event_width) log_line = f"[{time_str}] [{thread_str}] [{id_str}] [{event_str}] {details_str}".rstrip() if cls._log_to_console: print(log_line) if cls._file_handle: cls._file_handle.write(log_line + '\n') if cls._file_handle: cls._file_handle.flush()
# --- Trace Analysis Functionality --- # Regex to parse a log line into its components LOG_PATTERN = re.compile( r"\[(?P<time>\d{2}:\d{2}:\d{2}\.\d{6})\]\s+" r"\[(?P<thread_name>.*?)\s*\]\s+" r"\[(?P<component_id>.*?)\s*\]\s+" r"\[(?P<action>\w+)\s*\]\s*(?P<details>.*)" )
[docs] class TimedEvent: """Represents a timed event with a start, end, and children.""" def __init__(self, timestamp, component_id, event_type, thread_name, details, tick): self.start_time = timestamp self.end_time = None self.component_id = component_id self.event_type = event_type.replace('_START', '') self.thread_name = thread_name self.details = details self.start_tick = tick self.end_tick = None self.children = [] self.parent = None self.duration_ms = 0.0
[docs] def complete(self, end_time, end_tick): """Marks the event as complete and calculates its duration.""" self.end_time = end_time self.end_tick = end_tick if self.start_time: self.duration_ms = (self.end_time - self.start_time).total_seconds() * 1000
def __repr__(self): return (f"{self.event_type} on {self.component_id} " f"({self.duration_ms:.3f} ms)")
def _parse_details(details_str: str) -> dict: """ Parses the details string, which contains key=value pairs separated by '|'. """ details = {} for part in details_str.split('|'): if '=' in part: key, value = part.split('=', 1) key = key.strip() val = value.strip() if val.isdigit(): val = int(val) details[key] = val return details def _parse_log(log_path): """ Parses the trace log file and reconstructs the hierarchy of timed events. """ event_stacks = defaultdict(list) root_events = [] all_events_by_id = {} unlinked_children = [] with open(log_path, 'r') as f: for line in f: match = LOG_PATTERN.match(line.strip()) if not match: continue data = match.groupdict() timestamp = datetime.strptime(data['time'], ml_conf.TRACER_TIMESTAMP_FORMAT) thread_name = data['thread_name'].strip() component_id = data['component_id'].strip() action = data['action'].strip() details = _parse_details(data['details']) tick = details.get(ml_conf.LOG_DETAIL_KEY_TICK) start_tick = details.get(ml_conf.LOG_DETAIL_KEY_START_TICK, tick) # Use start_tick if present, else fallback to tick end_tick = details.get(ml_conf.LOG_DETAIL_KEY_END_TICK) if action.endswith('_START'): event = TimedEvent(timestamp, component_id, action, thread_name, details, start_tick) if ml_conf.TRACER_KEY_EVENT_ID in details: all_events_by_id[details['event_id']] = event stack = event_stacks[thread_name] if ml_conf.TRACER_KEY_PARENT_EVENT_ID in details: unlinked_children.append(event) elif stack: stack[-1].children.append(event) event.parent = stack[-1] stack.append(event) elif action.endswith('_END'): stack = event_stacks[thread_name] event_type_to_match = action.replace('_END', '') found_idx = -1 for i in range(len(stack) - 1, -1, -1): if stack[i].event_type == event_type_to_match and stack[i].component_id == component_id: found_idx = i break if found_idx != -1: for i in range(len(stack) - 1, found_idx - 1, -1): event = stack.pop() event.complete(timestamp, end_tick if end_tick is not None else tick) for event in unlinked_children: parent_id = event.details[ml_conf.TRACER_KEY_PARENT_EVENT_ID] parent = all_events_by_id.get(parent_id) if parent: parent.children.append(event) event.parent = parent for event in all_events_by_id.values(): if event.parent is None: root_events.append(event) if event.children: event.children.sort(key=lambda e: e.start_time) return sorted(root_events, key=lambda e: e.start_time), event_stacks def _print_event_hierarchy(event, indent=0, file=sys.stdout): """Recursively prints the event hierarchy with indentation.""" prefix = " " * indent strategy_info = "" if event.event_type == ml_conf.LOG_EVENT_EXECUTION_STRATEGY and ml_conf.LOG_DETAIL_KEY_STRATEGY in event.details: strategy_info = f" (strategy: {event.details[ml_conf.LOG_DETAIL_KEY_STRATEGY]})" details_parts = [] details_parts.append(f"Duration: {event.duration_ms:8.3f} ms") if event.start_time: details_parts.append(f"Start Time: {event.start_time.strftime(ml_conf.TRACER_TIMESTAMP_FORMAT)}") if event.end_time: details_parts.append(f"End Time: {event.end_time.strftime(ml_conf.TRACER_TIMESTAMP_FORMAT)}") if event.start_tick is not None: details_parts.append(f"Start Tick: {event.start_tick}") if event.end_tick is not None: details_parts.append(f"End Tick: {event.end_tick}") details_line = " ".join(details_parts) print(f"{prefix}- {event.component_id:<30} [{event.event_type:<24}] {details_line}{strategy_info}", file=file) for child in event.children: _print_event_hierarchy(child, indent + 1, file=file) def _convert_event_to_dict(event: TimedEvent) -> Dict[str, Any]: """Recursively converts a TimedEvent and its children to a dictionary.""" event_name_str = event.event_type.replace('_START', '').replace('_END', '') # Clean up details by removing redundant tick information before serialization. clean_details = event.details.copy() clean_details.pop('tick', None) clean_details.pop('start_tick', None) clean_details.pop('end_tick', None) event_dict = { "component_id": event.component_id, "event_name": event_name_str, "action": event.event_type, "duration_ms": event.duration_ms, "start_time": event.start_time.isoformat() if event.start_time else None, "end_time": event.end_time.isoformat() if event.end_time else None, "start_tick": event.start_tick, "end_tick": event.end_tick, "details": clean_details, "thread_name": event.thread_name, "children": [_convert_event_to_dict(child) for child in event.children] } return event_dict # --- Perfetto JSON Generation --- def _find_all_relationships_from_timed_events(events: List[TimedEvent], parent_map: Dict, all_threads: set): """Recursively traverses TimedEvents to find every thread and its creator.""" for event in events: if event.thread_name: all_threads.add(event.thread_name) details = event.details or {} child_thread = details.get('thread_name') parent_thread = details.get('creator_thread') if child_thread and parent_thread and child_thread != parent_thread: all_threads.add(child_thread) all_threads.add(parent_thread) if child_thread not in parent_map: parent_map[child_thread] = parent_thread if event.children: _find_all_relationships_from_timed_events(event.children, parent_map, all_threads) def _build_thread_order_list(current_thread: str, children_map: Dict, ordered_list: List, visited_set: set): """Performs a depth-first traversal of the hierarchy map to build the ordered list.""" if current_thread in visited_set: return ordered_list.append(current_thread) visited_set.add(current_thread) for child in sorted(children_map.get(current_thread, [])): _build_thread_order_list(child, children_map, ordered_list, visited_set) def _create_perfetto_metadata_events(pid: int, thread_to_tid: Dict) -> List[Dict]: """Creates metadata events to name the process and threads in Perfetto.""" metadata = [{"name": "process_name", "ph": "M", "pid": pid, "args": {"name": "Multirotor Simulation"}}] for name, tid in thread_to_tid.items(): metadata.append({"name": "thread_name", "ph": "M", "pid": pid, "tid": tid, "args": {"name": name}}) return metadata def _convert_timed_event_to_perfetto(event: TimedEvent, pid: int, base_timestamp_us: float, thread_to_tid: Dict) -> Dict: """Converts a single TimedEvent to the Perfetto Trace Event Format.""" start_us = (event.start_time.timestamp() * 1_000_000) - base_timestamp_us duration_us = event.duration_ms * 1000 return { "name": event.event_type, "cat": event.component_id, "ph": "X", "ts": start_us, "dur": duration_us, "pid": pid, "tid": thread_to_tid.get(event.thread_name), "args": event.details } def _flatten_events_for_perfetto(events: List[TimedEvent], pid: int, base_timestamp_us: float, thread_to_tid: Dict) -> List[Dict]: """Recursively flattens the TimedEvent hierarchy for Perfetto.""" perfetto_events = [] for event in events: if event.start_time and event.end_time: perfetto_events.append(_convert_timed_event_to_perfetto(event, pid, base_timestamp_us, thread_to_tid)) if event.children: perfetto_events.extend(_flatten_events_for_perfetto(event.children, pid, base_timestamp_us, thread_to_tid)) return perfetto_events def _generate_perfetto_json(root_events: List[TimedEvent], out_stream: TextIO): """Generates and writes the Perfetto JSON to the given stream.""" # 1. Build hierarchy from creator_thread attributes parent_map = {} all_threads_set = set() _find_all_relationships_from_timed_events(root_events, parent_map, all_threads_set) children_map = defaultdict(list) for child, parent in parent_map.items(): children_map[parent].append(child) # 2. Create the final ordered list of threads main_thread_name = 'MainThread' sim_hierarchy_list = [] visited_set = set() if 'top_thread' in all_threads_set: _build_thread_order_list('top_thread', children_map, sim_hierarchy_list, visited_set) other_root_threads = sorted([t for t in all_threads_set if t not in visited_set and t not in parent_map]) final_ordered_list = [main_thread_name] final_ordered_list.extend([t for t in sim_hierarchy_list if t != main_thread_name]) final_ordered_list.extend([t for t in other_root_threads if t != main_thread_name]) all_threads = final_ordered_list # 3. Create thread ID mapping (starting from 1) thread_to_tid = {name: i + 1 for i, name in enumerate(all_threads) if name} # 4. Generate Perfetto events base_timestamp_us = root_events[0].start_time.timestamp() * 1_000_000 pid = 1 metadata_events = _create_perfetto_metadata_events(pid, thread_to_tid) perfetto_events = _flatten_events_for_perfetto(root_events, pid, base_timestamp_us, thread_to_tid) # 5. Write to stream output_data = {'traceEvents': metadata_events + perfetto_events} json.dump(output_data, out_stream, indent=2)
[docs] def analyze_trace_log(log_path: str, output_format: str = 'text', output_file: Optional[str] = None): """ Main analysis function. Parses the log and prints a performance report. Args: log_path: Path to the simulation trace log file. output_format: The desired output format. Can be 'text', 'json', or 'json:perfetto'. output_file: Optional path to write the output to. If None, prints to stdout. """ try: all_steps, open_stacks = _parse_log(log_path) except FileNotFoundError: print(f"Error: Log file not found at '{log_path}'") return complete_root_events = [event for event in all_steps if event.end_time is not None] # Determine the output stream out_stream = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout try: if not complete_root_events: if output_format.startswith('json'): json.dump([], out_stream, indent=2) else: # text print("Could not find any fully completed simulation steps in the log.", file=out_stream) return if output_format == 'json': json_output = [_convert_event_to_dict(event) for event in complete_root_events] json.dump(json_output, out_stream, indent=2) elif output_format == 'json:perfetto': print("Generating Perfetto-compatible JSON trace...", file=sys.stderr) _generate_perfetto_json(complete_root_events, out_stream) else: # text format print("\n" + "=" * 80, file=out_stream) print("Performance Analysis of Simulation Step(s)", file=out_stream) print("=" * 80, file=out_stream) for event in complete_root_events: _print_event_hierarchy(event, file=out_stream) print("-" * 80, file=out_stream) finally: if output_file: out_stream.close()