import threading
import time
from typing import List, Dict, Optional, Any, TextIO
from collections import namedtuple, defaultdict
import sys
import builtins
import re
from .enums import LogLevel
import json
from datetime import datetime
from . import conf as ml_conf
# A structured representation of a single log entry.
LogRecord = namedtuple('LogRecord', ['timestamp', 'thread_name', 'component_id', 'event', 'details', 'level'])
[docs]
class Tracer:
"""
A thread-safe, centralized logging utility for the simulation engine.
It buffers log records and periodically flushes them to the console and/or
a file in a formatted, column-aligned manner.
"""
_log_buffer: List[LogRecord] = []
_lock = threading.Lock()
_thread: Optional[threading.Thread] = None
_active_events: Dict[int, Dict] = {} # Tracks active events by their unique ID
_event_counter: int = 0
_stop_event = threading.Event()
_level: LogLevel = LogLevel.INFO
_flush_interval: float = 1.0
_output_file: Optional[str] = None
_log_to_console: bool = True
_file_handle: Optional[TextIO] = None
_error_file: Optional[str] = None
_error_file_handle: Optional[TextIO] = None
[docs]
@classmethod
def start(cls, level: LogLevel, flush_interval_seconds: float, output_file: Optional[str], log_to_console: bool = True, error_file: Optional[str] = "errors.log"):
"""
Initializes and starts the tracer thread.
Args:
level: The minimum log level to record.
flush_interval_seconds: How often to flush logs.
output_file: An optional file path to write logs to.
log_to_console: If True, logs will also be printed to the console.
error_file: An optional file path to write detailed exception
tracebacks to.
"""
with cls._lock:
if cls._thread is not None:
print(ml_conf.TRACER_ERROR_ALREADY_STARTED, file=sys.stderr)
return
cls._level = level
cls._flush_interval = flush_interval_seconds
cls._output_file = output_file
cls._log_to_console = log_to_console
cls._error_file = error_file
if cls._output_file:
# Clear the file on start
cls._file_handle = open(cls._output_file, 'w', encoding='utf-8')
if cls._error_file:
cls._error_file_handle = open(cls._error_file, 'w', encoding='utf-8')
cls._stop_event.clear()
cls._thread = threading.Thread(target=cls._run, daemon=True, name=ml_conf.TRACER_THREAD_NAME)
cls._thread.start()
[docs]
@classmethod
def stop(cls):
"""Stops the tracer thread and performs a final flush of all logs."""
thread_to_join = None
with cls._lock:
if cls._thread is None:
return
cls._stop_event.set()
thread_to_join = cls._thread
cls._thread = None
if thread_to_join:
thread_to_join.join()
# Final flush after the thread has stopped
cls._flush_logs()
if cls._file_handle:
cls._file_handle.close()
cls._file_handle = None
if cls._error_file_handle:
cls._error_file_handle.close()
cls._error_file_handle = None
[docs]
@classmethod
def log(cls, level: LogLevel, component_id: str, event: str, details: Optional[Dict[str, Any]] = None, tick: Optional[int] = None):
"""
Adds a log record to the buffer.
This method is thread-safe.
Args:
level: The severity level of the log.
component_id: The identifier of the component logging the event.
event: The name of the event being logged.
details: An optional dictionary of structured data. If a string is
passed, it will be wrapped in a 'message' key.
tick: The optional simulation tick number.
"""
if cls._thread is None or level.value < cls._level.value:
return None # No log entry created, so return None
timestamp = time.time()
thread_name = threading.current_thread().name or ml_conf.THREAD_UNKNOWN_NAME
# Ensure details is a mutable dictionary
log_details = details.copy() if isinstance(details, dict) else {ml_conf.LOG_DETAIL_KEY_MESSAGE: details} if details else {}
if tick is not None:
log_details[ml_conf.LOG_DETAIL_KEY_TICK] = tick
# Assign a unique event_id for START events
if event.endswith('_START'):
with cls._lock:
cls._event_counter += 1
event_id = cls._event_counter
log_details[ml_conf.TRACER_KEY_EVENT_ID] = event_id # Add event_id to the details dictionary
# Store the start event details for later lookup on _END
cls._active_events[event_id] = {ml_conf.LOG_DETAIL_KEY_TICK: tick}
elif event.endswith('_END'):
event_id = log_details.get(ml_conf.TRACER_KEY_EVENT_ID)
if event_id in cls._active_events:
start_event_details = cls._active_events.pop(event_id)
# The tick from the start event becomes the start_tick
log_details[ml_conf.LOG_DETAIL_KEY_START_TICK] = start_event_details.get(ml_conf.LOG_DETAIL_KEY_TICK)
# The tick from this _END call becomes the end_tick
log_details[ml_conf.LOG_DETAIL_KEY_END_TICK] = tick
# Create the LogRecord for internal buffering
record = LogRecord(timestamp, thread_name, component_id, event, log_details, level)
with cls._lock:
cls._log_buffer.append(record)
# Return a dictionary representation of the log entry for external use (e.g., parent_event_id)
# This ensures the caller gets the full details, including the event_id if it's a START event.
return {ml_conf.TRACER_KEY_TIMESTAMP: timestamp, ml_conf.TRACER_KEY_THREAD_NAME: thread_name,
ml_conf.TRACER_KEY_COMPONENT_ID: component_id, ml_conf.TRACER_KEY_ACTION: event,
ml_conf.TRACER_KEY_DETAILS: log_details
}
@classmethod
def _run(cls):
"""The main loop for the tracer thread, flushing logs periodically."""
while not cls._stop_event.wait(cls._flush_interval):
cls._flush_logs()
@classmethod
def _flush_logs(cls):
"""
Formats and writes all buffered logs to the configured outputs.
This method calculates column widths dynamically for alignment.
"""
logs_to_flush: List[LogRecord] = []
with cls._lock:
if not cls._log_buffer:
return
cls._log_buffer.sort(key=lambda r: r.timestamp)
logs_to_flush = cls._log_buffer
cls._log_buffer = []
if not logs_to_flush:
return
max_thread_width = max(len(r.thread_name) for r in logs_to_flush)
max_id_width = max(len(r.component_id) for r in logs_to_flush)
max_event_width = max(len(r.event) for r in logs_to_flush)
for record in logs_to_flush:
is_exception = record.level == LogLevel.ERROR and record.event == ml_conf.LOG_EVENT_EXCEPTION and ml_conf.LOG_DETAIL_KEY_TRACEBACK in record.details
if is_exception and cls._error_file_handle:
# Write full traceback to error log
error_header = f"--- Exception at {record.timestamp} in {record.component_id} ({record.thread_name}) ---\n"
cls._error_file_handle.write(error_header)
cls._error_file_handle.write(record.details[ml_conf.LOG_DETAIL_KEY_TRACEBACK] + '\n\n')
cls._error_file_handle.flush()
# Modify details for the main log to be less verbose
main_log_details = record.details.copy()
main_log_details[ml_conf.LOG_DETAIL_KEY_TRACEBACK] = ml_conf.LOG_DETAIL_TRACEBACK_REDIRECT.format(cls._error_file)
else:
main_log_details = record.details
dt_object = datetime.fromtimestamp(record.timestamp)
time_str = dt_object.strftime(ml_conf.TRACER_TIMESTAMP_FORMAT)
if isinstance(main_log_details, dict):
# Always write the full details to the log file for parsing.
details_str = " | ".join([f"{k}={v}" for k, v in main_log_details.items()])
else:
details_str = str(main_log_details)
thread_str = record.thread_name.ljust(max_thread_width)
id_str = record.component_id.ljust(max_id_width)
event_str = record.event.ljust(max_event_width)
log_line = f"[{time_str}] [{thread_str}] [{id_str}] [{event_str}] {details_str}".rstrip()
if cls._log_to_console:
print(log_line)
if cls._file_handle:
cls._file_handle.write(log_line + '\n')
if cls._file_handle:
cls._file_handle.flush()
# --- Trace Analysis Functionality ---
# Regex to parse a log line into its components
LOG_PATTERN = re.compile(
r"\[(?P<time>\d{2}:\d{2}:\d{2}\.\d{6})\]\s+"
r"\[(?P<thread_name>.*?)\s*\]\s+"
r"\[(?P<component_id>.*?)\s*\]\s+"
r"\[(?P<action>\w+)\s*\]\s*(?P<details>.*)"
)
[docs]
class TimedEvent:
"""Represents a timed event with a start, end, and children."""
def __init__(self, timestamp, component_id, event_type, thread_name, details, tick):
self.start_time = timestamp
self.end_time = None
self.component_id = component_id
self.event_type = event_type.replace('_START', '')
self.thread_name = thread_name
self.details = details
self.start_tick = tick
self.end_tick = None
self.children = []
self.parent = None
self.duration_ms = 0.0
[docs]
def complete(self, end_time, end_tick):
"""Marks the event as complete and calculates its duration."""
self.end_time = end_time
self.end_tick = end_tick
if self.start_time:
self.duration_ms = (self.end_time - self.start_time).total_seconds() * 1000
def __repr__(self):
return (f"{self.event_type} on {self.component_id} "
f"({self.duration_ms:.3f} ms)")
def _parse_details(details_str: str) -> dict:
"""
Parses the details string, which contains key=value pairs
separated by '|'.
"""
details = {}
for part in details_str.split('|'):
if '=' in part:
key, value = part.split('=', 1)
key = key.strip()
val = value.strip()
if val.isdigit():
val = int(val)
details[key] = val
return details
def _parse_log(log_path):
"""
Parses the trace log file and reconstructs the hierarchy of timed events.
"""
event_stacks = defaultdict(list)
root_events = []
all_events_by_id = {}
unlinked_children = []
with open(log_path, 'r') as f:
for line in f:
match = LOG_PATTERN.match(line.strip())
if not match:
continue
data = match.groupdict()
timestamp = datetime.strptime(data['time'], ml_conf.TRACER_TIMESTAMP_FORMAT)
thread_name = data['thread_name'].strip()
component_id = data['component_id'].strip()
action = data['action'].strip()
details = _parse_details(data['details'])
tick = details.get(ml_conf.LOG_DETAIL_KEY_TICK)
start_tick = details.get(ml_conf.LOG_DETAIL_KEY_START_TICK, tick) # Use start_tick if present, else fallback to tick
end_tick = details.get(ml_conf.LOG_DETAIL_KEY_END_TICK)
if action.endswith('_START'):
event = TimedEvent(timestamp, component_id, action, thread_name, details, start_tick)
if ml_conf.TRACER_KEY_EVENT_ID in details:
all_events_by_id[details['event_id']] = event
stack = event_stacks[thread_name]
if ml_conf.TRACER_KEY_PARENT_EVENT_ID in details:
unlinked_children.append(event)
elif stack:
stack[-1].children.append(event)
event.parent = stack[-1]
stack.append(event)
elif action.endswith('_END'):
stack = event_stacks[thread_name]
event_type_to_match = action.replace('_END', '')
found_idx = -1
for i in range(len(stack) - 1, -1, -1):
if stack[i].event_type == event_type_to_match and stack[i].component_id == component_id:
found_idx = i
break
if found_idx != -1:
for i in range(len(stack) - 1, found_idx - 1, -1):
event = stack.pop()
event.complete(timestamp, end_tick if end_tick is not None else tick)
for event in unlinked_children:
parent_id = event.details[ml_conf.TRACER_KEY_PARENT_EVENT_ID]
parent = all_events_by_id.get(parent_id)
if parent:
parent.children.append(event)
event.parent = parent
for event in all_events_by_id.values():
if event.parent is None:
root_events.append(event)
if event.children:
event.children.sort(key=lambda e: e.start_time)
return sorted(root_events, key=lambda e: e.start_time), event_stacks
def _print_event_hierarchy(event, indent=0, file=sys.stdout):
"""Recursively prints the event hierarchy with indentation."""
prefix = " " * indent
strategy_info = ""
if event.event_type == ml_conf.LOG_EVENT_EXECUTION_STRATEGY and ml_conf.LOG_DETAIL_KEY_STRATEGY in event.details:
strategy_info = f" (strategy: {event.details[ml_conf.LOG_DETAIL_KEY_STRATEGY]})"
details_parts = []
details_parts.append(f"Duration: {event.duration_ms:8.3f} ms")
if event.start_time:
details_parts.append(f"Start Time: {event.start_time.strftime(ml_conf.TRACER_TIMESTAMP_FORMAT)}")
if event.end_time:
details_parts.append(f"End Time: {event.end_time.strftime(ml_conf.TRACER_TIMESTAMP_FORMAT)}")
if event.start_tick is not None:
details_parts.append(f"Start Tick: {event.start_tick}")
if event.end_tick is not None:
details_parts.append(f"End Tick: {event.end_tick}")
details_line = " ".join(details_parts)
print(f"{prefix}- {event.component_id:<30} [{event.event_type:<24}] {details_line}{strategy_info}", file=file)
for child in event.children:
_print_event_hierarchy(child, indent + 1, file=file)
def _convert_event_to_dict(event: TimedEvent) -> Dict[str, Any]:
"""Recursively converts a TimedEvent and its children to a dictionary."""
event_name_str = event.event_type.replace('_START', '').replace('_END', '')
# Clean up details by removing redundant tick information before serialization.
clean_details = event.details.copy()
clean_details.pop('tick', None)
clean_details.pop('start_tick', None)
clean_details.pop('end_tick', None)
event_dict = {
"component_id": event.component_id,
"event_name": event_name_str,
"action": event.event_type,
"duration_ms": event.duration_ms,
"start_time": event.start_time.isoformat() if event.start_time else None,
"end_time": event.end_time.isoformat() if event.end_time else None,
"start_tick": event.start_tick,
"end_tick": event.end_tick,
"details": clean_details,
"thread_name": event.thread_name,
"children": [_convert_event_to_dict(child) for child in event.children]
}
return event_dict
# --- Perfetto JSON Generation ---
def _find_all_relationships_from_timed_events(events: List[TimedEvent], parent_map: Dict, all_threads: set):
"""Recursively traverses TimedEvents to find every thread and its creator."""
for event in events:
if event.thread_name:
all_threads.add(event.thread_name)
details = event.details or {}
child_thread = details.get('thread_name')
parent_thread = details.get('creator_thread')
if child_thread and parent_thread and child_thread != parent_thread:
all_threads.add(child_thread)
all_threads.add(parent_thread)
if child_thread not in parent_map:
parent_map[child_thread] = parent_thread
if event.children:
_find_all_relationships_from_timed_events(event.children, parent_map, all_threads)
def _build_thread_order_list(current_thread: str, children_map: Dict, ordered_list: List, visited_set: set):
"""Performs a depth-first traversal of the hierarchy map to build the ordered list."""
if current_thread in visited_set:
return
ordered_list.append(current_thread)
visited_set.add(current_thread)
for child in sorted(children_map.get(current_thread, [])):
_build_thread_order_list(child, children_map, ordered_list, visited_set)
def _create_perfetto_metadata_events(pid: int, thread_to_tid: Dict) -> List[Dict]:
"""Creates metadata events to name the process and threads in Perfetto."""
metadata = [{"name": "process_name", "ph": "M", "pid": pid, "args": {"name": "Multirotor Simulation"}}]
for name, tid in thread_to_tid.items():
metadata.append({"name": "thread_name", "ph": "M", "pid": pid, "tid": tid, "args": {"name": name}})
return metadata
def _convert_timed_event_to_perfetto(event: TimedEvent, pid: int, base_timestamp_us: float, thread_to_tid: Dict) -> Dict:
"""Converts a single TimedEvent to the Perfetto Trace Event Format."""
start_us = (event.start_time.timestamp() * 1_000_000) - base_timestamp_us
duration_us = event.duration_ms * 1000
return {
"name": event.event_type,
"cat": event.component_id,
"ph": "X",
"ts": start_us,
"dur": duration_us,
"pid": pid,
"tid": thread_to_tid.get(event.thread_name),
"args": event.details
}
def _flatten_events_for_perfetto(events: List[TimedEvent], pid: int, base_timestamp_us: float, thread_to_tid: Dict) -> List[Dict]:
"""Recursively flattens the TimedEvent hierarchy for Perfetto."""
perfetto_events = []
for event in events:
if event.start_time and event.end_time:
perfetto_events.append(_convert_timed_event_to_perfetto(event, pid, base_timestamp_us, thread_to_tid))
if event.children:
perfetto_events.extend(_flatten_events_for_perfetto(event.children, pid, base_timestamp_us, thread_to_tid))
return perfetto_events
def _generate_perfetto_json(root_events: List[TimedEvent], out_stream: TextIO):
"""Generates and writes the Perfetto JSON to the given stream."""
# 1. Build hierarchy from creator_thread attributes
parent_map = {}
all_threads_set = set()
_find_all_relationships_from_timed_events(root_events, parent_map, all_threads_set)
children_map = defaultdict(list)
for child, parent in parent_map.items():
children_map[parent].append(child)
# 2. Create the final ordered list of threads
main_thread_name = 'MainThread'
sim_hierarchy_list = []
visited_set = set()
if 'top_thread' in all_threads_set:
_build_thread_order_list('top_thread', children_map, sim_hierarchy_list, visited_set)
other_root_threads = sorted([t for t in all_threads_set if t not in visited_set and t not in parent_map])
final_ordered_list = [main_thread_name]
final_ordered_list.extend([t for t in sim_hierarchy_list if t != main_thread_name])
final_ordered_list.extend([t for t in other_root_threads if t != main_thread_name])
all_threads = final_ordered_list
# 3. Create thread ID mapping (starting from 1)
thread_to_tid = {name: i + 1 for i, name in enumerate(all_threads) if name}
# 4. Generate Perfetto events
base_timestamp_us = root_events[0].start_time.timestamp() * 1_000_000
pid = 1
metadata_events = _create_perfetto_metadata_events(pid, thread_to_tid)
perfetto_events = _flatten_events_for_perfetto(root_events, pid, base_timestamp_us, thread_to_tid)
# 5. Write to stream
output_data = {'traceEvents': metadata_events + perfetto_events}
json.dump(output_data, out_stream, indent=2)
[docs]
def analyze_trace_log(log_path: str, output_format: str = 'text', output_file: Optional[str] = None):
"""
Main analysis function. Parses the log and prints a performance report.
Args:
log_path: Path to the simulation trace log file.
output_format: The desired output format. Can be 'text', 'json', or 'json:perfetto'.
output_file: Optional path to write the output to. If None, prints to stdout.
"""
try:
all_steps, open_stacks = _parse_log(log_path)
except FileNotFoundError:
print(f"Error: Log file not found at '{log_path}'")
return
complete_root_events = [event for event in all_steps if event.end_time is not None]
# Determine the output stream
out_stream = open(output_file, 'w', encoding='utf-8') if output_file else sys.stdout
try:
if not complete_root_events:
if output_format.startswith('json'):
json.dump([], out_stream, indent=2)
else: # text
print("Could not find any fully completed simulation steps in the log.", file=out_stream)
return
if output_format == 'json':
json_output = [_convert_event_to_dict(event) for event in complete_root_events]
json.dump(json_output, out_stream, indent=2)
elif output_format == 'json:perfetto':
print("Generating Perfetto-compatible JSON trace...", file=sys.stderr)
_generate_perfetto_json(complete_root_events, out_stream)
else: # text format
print("\n" + "=" * 80, file=out_stream)
print("Performance Analysis of Simulation Step(s)", file=out_stream)
print("=" * 80, file=out_stream)
for event in complete_root_events:
_print_event_hierarchy(event, file=out_stream)
print("-" * 80, file=out_stream)
finally:
if output_file:
out_stream.close()