Source code for ml.engine

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Dict, Set, Callable, Optional, Any, Deque
from collections import deque
import threading
import traceback
from .tracer import Tracer
from .enums import LogLevel, QueueMode, OnFullBehavior
from . import conf

# TODO: Future Enhancements
# 1.  **Deadlock Detection**: Implement a mechanism in the `Part._run` loop to detect
#     when the system is idle but not all stop conditions are met, and no
#     external event sources are active. This would indicate a potential deadlock.
# 2.  **Algebraic Loop Detection**: Before starting the simulation, analyze the
#     connection graph to detect and report any direct feedback loops between
#     behavioral parts that don't have an `EventQueue` to break the cycle.
#     Such loops would cause infinite execution within a single `__step`.
# 3.  **Part/Port Attributes**: Add a dictionary-based `attributes` property to
#     `Part` and `Port` to allow users to attach arbitrary metadata (e.g., units,
#     physical properties, documentation).
# 4.  **Attribute Propagation**: Develop a system for propagating attributes
#     (or checking for their compatibility) across connections. For example,
#     ensuring a port expecting a 'voltage' is not connected to one providing 'pressure'.
# 5.  **JSON Import/Export**: Create a mechanism to serialize a structural `Part`'s
#     topology (inner parts, connections) to a JSON file and to instantiate
#     a `Part` from such a file. This would enable model definition outside of Python.
# 6.  **Diagram Generation**: Add a utility to export the connection graph of a
#     structural `Part` to a format compatible with diagramming tools like
#     Graphviz (DOT file), allowing for automatic visualization of the system
#     architecture.
# 7.  **Connectivity Check**: Add a pre-run check to verify that all `Port`s and
#     `EventQueue`s are connected, preventing runtime errors from dangling interfaces.
# 8.  **Distributed Simulation**: Elaborate a strategy to move a `Part` to a remote
#     host, for instance by defining a "behavioral bridge" that serializes data
#     over a network (e.g., TCP/IP, ZeroMQ) to a corresponding bridge on another machine.
# 9.  **External Code Integration**: Develop a strategy for co-simulation. This could
#     involve generating wrapper code from a `Part`'s `behavior()` method to
#     interface with simulators for other languages (e.g., VHDL, SystemC), or
#     vice-versa, allowing parts of the model to be defined in external environments.

[docs] def sequential_execution(scheduled_parts: Set[Part]): """ A simple execution strategy that executes scheduled parts sequentially. It iterates through the provided set of parts and calls the `execute()` method on each one. The order of execution is not guaranteed due to the nature of sets. Args: scheduled_parts: A set of parts that have been scheduled for execution. """ # The tracer logs which parts are scheduled in the `__step` method. for part in scheduled_parts: # The `execute` method itself is instrumented with trace calls. part.execute()
[docs] class Port: """Represents an input or output port for a Part.""" IN = conf.DIRECTION_IN OUT = conf.DIRECTION_OUT # A type hint for a parent object that has a 'full_identifier' attribute. Parent = Any
[docs] class OverwriteError(Exception): """Exception raised when set() is called on an already updated Port.""" pass
[docs] class StaleReadError(Exception): """Exception raised when get() is called on a port that has not been updated.""" pass
def __init__(self, identifier: str, direction: str): """ Initializes a Port. Args: identifier: The name of the port. direction: The direction, either Port.IN or Port.OUT. """ self.__identifier = identifier self.__direction = direction self.__full_identifier = self.__identifier self.__parent: Optional[Port.Parent] = None self.__payload: Any = None self.__updated: bool = False self.__connected_as_slave = False # Creation logging is now handled by the parent Part's _log_creation_cascade.
[docs] def get_identifier(self) -> str: """ Gets the local identifier of the port. Returns: str: The local identifier of the port. """ return self.__identifier
[docs] def get_direction(self) -> str: """ Gets the direction of the port. Returns: str: The direction of the port (e.g., 'input' or 'output'). """ return self.__direction
[docs] def get_full_identifier(self) -> str: """ Gets the fully qualified identifier of the port. Returns: str: The fully qualified identifier, including parent hierarchy. """ return self.__full_identifier
[docs] def get_parent(self) -> Optional[Parent]: """ Gets the parent component of the port. Returns: Optional[Parent]: The parent object (typically a Part), or None if not set. """ return self.__parent
[docs] def get(self) -> Any: """ Retrieves the payload from the port. This is a non-destructive read; the 'updated' flag is cleared by the engine after the owning part's `execute()` method has completed for the current step. Returns: Any: The payload currently held by the port. """ if not self.is_updated(): raise Port.StaleReadError(conf.ERROR_PORT_STALE_READ.format(self.get_full_identifier())) return self.__payload
[docs] def is_updated(self) -> bool: """ Checks if the port's payload has been updated since the last clear. Returns: bool: True if the port holds new data, False otherwise. """ return self.__updated
[docs] def set(self, payload: Any): """ Sets the payload of the port and marks it as updated. This method is called by the framework during data transfers and should be called by the user inside a `behavior()` method to set the value of an output port. Args: payload: The new payload for the port. """ if self.is_updated(): raise Port.OverwriteError(conf.ERROR_PORT_OVERWRITE.format(self.get_full_identifier())) self.__payload = payload self.__updated = True Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_SET_PAYLOAD, {"payload": self.__payload})
def _clear_update_flag(self): """ Clears the port's updated flag. This method is called by the simulation engine after a part's `execute` or `__step` method has consumed the port's data, making it ready to receive a new payload. """ Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_CLEAR_FLAG) self.__updated = False def _set_connected_as_slave(self): """ Marks the port as connected as a slave, ensuring it's only done once. Raises: ConnectionError: If the port is already connected as a slave. """ if self.__connected_as_slave: raise ConnectionError(conf.ERROR_SLAVE_PORT_ALREADY_CONNECTED.format(self.get_full_identifier())) self.__connected_as_slave = True def _set_parent(self, parent: Parent): """ Sets the parent component. This is called during the parent's initialization to establish the hierarchical structure and enable full identifier generation. Args: parent (Parent): The parent object (typically a Part). """ if self.__parent: return # Parent already set. self.__parent = parent self._update_full_identifier() def _log_creation(self): """Logs the creation of this component. Called after the full_identifier is finalized.""" Tracer.log(conf.LOG_LEVEL_CREATE, self.get_full_identifier(), conf.LOG_EVENT_CREATE, {"direction": self.get_direction()}) def _update_full_identifier(self): """Updates the full identifier based on the parent's.""" if self.__parent: # This uses the parent's public full_identifier attribute self.__full_identifier = f"{self.__parent.get_full_identifier()}.{self.get_identifier()}"
[docs] class EventQueue: """ Represents a queue for storing event payloads, with different behaviors for input and output directions. """ IN = conf.DIRECTION_IN OUT = conf.DIRECTION_OUT # A type hint for a parent object that has a 'full_identifier' attribute. Parent = Any
[docs] class FullError(Exception): """Exception raised when push() is called on a full EventQueue.""" pass
[docs] class EmptyError(Exception): """Exception raised when pop() is called on an empty EventQueue.""" pass
def __init__(self, identifier: str, direction: str, size: Optional[int] = None, mode: QueueMode = QueueMode.FIFO): """ Initializes an EventQueue. Args: identifier: The unique name of the event queue. direction: The direction, either EventQueue.IN or EventQueue.OUT. size: The maximum number of events for an IN queue. Must be None for OUT queues. mode: The queueing behavior, either FIFO (default) or LIFO. """ if direction not in [EventQueue.IN, EventQueue.OUT]: raise ValueError(conf.ERROR_INVALID_PORT_DIRECTION.format(direction)) self.__identifier = identifier self.__direction = direction self.__full_identifier = self.__identifier self.__parent: Optional[EventQueue.Parent] = None self.__mode = mode if direction == EventQueue.OUT: if size is not None: raise ValueError(conf.ERROR_INVALID_QUEUE_SIZE_FOR_DIRECTION.format(direction)) self.__size = 1 else: # IN if not isinstance(size, int) or size < 1: raise ValueError(conf.ERROR_INVALID_QUEUE_SIZE) self.__size = size self.__payload: Optional[Any] = None # For OUT queues self.__payloads: Deque[Any] = deque() # For IN queues self.__event_interfaces: List[EventInterface] = [] self.__connected_as_slave = False self.__connected_as_master_in_to_in = False # Creation logging is now handled by the parent's _log_creation_cascade or __init__.
[docs] def get_identifier(self) -> str: """ Gets the local identifier of the event queue. Returns: str: The local identifier of the event queue. """ return self.__identifier
[docs] def get_direction(self) -> str: """ Gets the direction of the event queue. Returns: str: The direction of the event queue (e.g., 'input' or 'output'). """ return self.__direction
[docs] def get_full_identifier(self) -> str: """ Gets the fully qualified identifier of the event queue. Returns: str: The fully qualified identifier, including parent hierarchy. """ return self.__full_identifier
[docs] def push(self, payload: Any, overwrite: bool = False): """ Adds an event payload to the queue based on its direction. Args: payload (Any): The event payload to add. overwrite (bool): For IN queues, if True, this will replace an existing item if the queue is full. """ if self.get_direction() == EventQueue.OUT: self.__payload = payload Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_PUSH, {"payload": payload}) # For output queues, immediately transfer the event to all connected interfaces. for interface in self.__event_interfaces: interface._transfer(overwrite=overwrite) else: if self.is_full(): if overwrite: size_before = len(self.__payloads) if self.__mode == QueueMode.LIFO: removed_payload = self.__payloads.pop() # Remove newest else: # FIFO removed_payload = self.__payloads.popleft() # Remove oldest Tracer.log(LogLevel.DEBUG, self.get_full_identifier(), conf.LOG_EVENT_OVERWRITE, {"removed": removed_payload, "added": payload}) self.__payloads.append(payload) Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_PUSH, {"payload": payload, "size_before": size_before - 1, "size_after": len(self.__payloads)}) return else: raise EventQueue.FullError(conf.ERROR_QUEUE_FULL_WITH_PAYLOAD.format(payload=payload, queue_id=self.get_full_identifier())) size_before = len(self.__payloads) self.__payloads.append(payload) Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_PUSH, {"payload": payload, "size_before": size_before, "size_after": len(self.__payloads)})
[docs] def peek(self) -> Optional[Any]: """ Gets the payload from an OUT queue without consuming it. Returns: Optional[Any]: The payload if the queue is an OUT queue and not empty, otherwise None. """ if self.is_empty() or self.get_direction() != EventQueue.OUT: return None Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_PEEK, {"payload": self.__payload}) return self.__payload
[docs] def is_empty(self) -> bool: """ Checks if the event queue is empty. Returns: bool: True if the queue has no items, False otherwise. """ return self.__payload is None if self.get_direction() == EventQueue.OUT else len(self.__payloads) == 0
[docs] def is_full(self) -> bool: """ Checks if the event queue is full. Returns: bool: True if the queue has reached its capacity, False otherwise. """ return not self.is_empty() if self.get_direction() == EventQueue.OUT else len(self.__payloads) >= self.__size
[docs] def clear(self): """Removes all event payloads from the queue.""" Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_CLEAR) if self.get_direction() == EventQueue.OUT: self.__payload = None else: self.__payloads.clear()
[docs] def pop(self) -> Optional[Any]: """ Removes and returns an event payload from an IN queue. The item returned depends on the queue's mode (FIFO or LIFO). For OUT queues, this is a destructive read of the single payload. Returns: Optional[Any]: The payload removed from the queue. """ if self.is_empty(): raise EventQueue.EmptyError(conf.ERROR_QUEUE_EMPTY.format(self.get_full_identifier())) if self.get_direction() == EventQueue.OUT: payload = self.__payload self.__payload = None Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_POP, {"payload": payload}) return payload else: size_before = len(self.__payloads) if self.__mode == QueueMode.LIFO: payload = self.__payloads.pop() else: # FIFO payload = self.__payloads.popleft() Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_POP, {"payload": payload, "size_before": size_before, "size_after": len(self.__payloads)}) return payload
def _add_interface(self, interface: EventInterface): """ Registers a connecting interface. This is called by an `EventInterface` during its initialization to allow the master queue to trigger transfers. Args: interface (EventInterface): The interface to register. """ self.__event_interfaces.append(interface) def _set_connected_as_master(self): """ Marks the queue as connected as a master, enforcing connection policies. Specifically, an IN-type queue can only be a master to one slave to ensure unambiguous event routing. """ # IN queues can only be master to one slave for clear routing. if self.get_direction() == EventQueue.IN: if self.__connected_as_master_in_to_in: raise ConnectionError(conf.ERROR_IN_QUEUE_MASTER_ALREADY_CONNECTED.format(self.get_full_identifier())) self.__connected_as_master_in_to_in = True def _set_connected_as_slave(self): """ Marks the queue as connected as a slave, ensuring it's only done once. Raises: ConnectionError: If the queue is already connected as a slave. """ if self.__connected_as_slave: raise ConnectionError(conf.ERROR_SLAVE_QUEUE_ALREADY_CONNECTED.format(self.get_full_identifier())) self.__connected_as_slave = True def _set_parent(self, parent: Parent): """ Sets the parent component. This is called during the parent's initialization to establish the hierarchical structure and enable full identifier generation. Args: parent (Parent): The parent object (typically a Part or EventSource). """ if self.__parent: return # Parent already set. self.__parent = parent self._update_full_identifier() def _log_creation(self): """Logs the creation of this component. Called after the full_identifier is finalized.""" Tracer.log(conf.LOG_LEVEL_CREATE, self.get_full_identifier(), conf.LOG_EVENT_CREATE, {"direction": self.get_direction(), "size": self.__size}) def _update_full_identifier(self): """Updates the full identifier based on the parent's.""" if self.__parent: # This uses the parent's public full_identifier attribute self.__full_identifier = f"{self.__parent.get_full_identifier()}.{self.get_identifier()}"
[docs] class EventSource(ABC): """ Abstract base class for components that generate events, typically in a separate thread. This class provides the core infrastructure for starting, stopping, and managing the lifecycle of an event source thread. Subclasses must implement the `_run_loop` method, which contains the logic for generating events. """ def __init__(self, identifier: str, on_full: OnFullBehavior = OnFullBehavior.FAIL): """ Initializes the EventSource. Args: identifier (str): The unique name for the event source. on_full (OnFullBehavior): The policy for handling a full target queue (FAIL, DROP, or OVERWRITE). """ self.__identifier = identifier self.__full_identifier = self.__identifier self.__on_full_behavior = on_full self.__output_queue = EventQueue(conf.DEFAULT_EVENT_SOURCE_OUT_QUEUE_ID, EventQueue.OUT) self.__output_queue._set_parent(self) # For a standalone EventSource, the name is final at creation, so we can log immediately. self.__output_queue._log_creation() self.__part_wakeup_signal: Optional[threading.Event] = None self.__is_running = False self.__thread: Optional[threading.Thread] = None self.__exception: Optional[Exception] = None self.__stop_event = threading.Event()
[docs] def get_identifier(self) -> str: """ Gets the local identifier of the event source. Returns: str: The local identifier of the event source. """ return self.__identifier
[docs] def get_full_identifier(self) -> str: """ Gets the fully qualified identifier of the event source. Returns: str: The fully qualified identifier. """ return self.__full_identifier
def _get_output_queue(self) -> EventQueue: """ Gets the single output queue for this event source. Returns: EventQueue: The internal output queue instance. """ return self.__output_queue
[docs] def get_exception(self) -> Optional[Exception]: """ Gets the exception that caused the run loop to terminate, if any. Returns: Optional[Exception]: The exception object if one occurred, otherwise None. """ return self.__exception
@abstractmethod def _run_loop(self): """ The main loop for the event source. This method must be implemented by subclasses. The loop should periodically check `self.is_running()` or `self.__stop_event.is_set()` and exit gracefully. """ pass
[docs] def stop_event_is_set(self) -> bool: """ Checks if the stop event has been set. Returns: bool: True if the stop event is set, False otherwise. """ return self.__stop_event.is_set()
[docs] def start(self): """Starts the event source in a new thread.""" if not self.is_running(): self.__stop_event.clear() self.__exception = None self.__is_running = True self.__thread = threading.Thread(target=self._run, daemon=True, name=f"{self.get_full_identifier()}{conf.THREAD_NAME_SUFFIX}") self.__thread.start()
def _run(self): """The internal thread target that wraps the main loop with exception handling.""" try: self._run_loop() except Exception as e: # Catches all exceptions, including EventQueue.FullError tb_str = traceback.format_exc() Tracer.log(LogLevel.ERROR, self.get_full_identifier(), conf.LOG_EVENT_EXCEPTION, {"error": str(e), "traceback": tb_str}) self.__exception = e if self.is_running(): self.stop() # The 'raise' is removed to prevent the default traceback dump. # The exception is stored and can be retrieved by the main thread. Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_THREAD_END)
[docs] def stop(self): """Stops the event source thread and signals any listeners.""" if not self.__is_running: return Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_STOP_SIGNALLED) self.__is_running = False self.__stop_event.set() if self.__part_wakeup_signal: self.__part_wakeup_signal.set()
[docs] def join(self, timeout: Optional[float] = None): """Waits for the event source thread to finish.""" """ Waits for the event source thread to finish. Args: timeout (Optional[float]): The maximum time in seconds to wait for the thread to complete. """ if self.__thread: self.__thread.join(timeout) self.__thread = None
[docs] def is_running(self) -> bool: """ Checks if the event source thread is running. Returns: bool: True if the source is running, False otherwise. """ return self.__is_running
[docs] def emit(self, payload: Any): """ Emits an event with a given payload. The output queue will handle the immediate transfer, and this method will signal any waiting Part. The output queue is cleared after the attempt, regardless of success. Args: payload (Any): The payload of the event to emit. """ # Log the emission attempt, including the policy for handling a full queue. Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_EMIT, {"payload": payload, "on_full_policy": self.__on_full_behavior.name}) try: # Determine if we should attempt to overwrite based on the policy. should_overwrite = self.__on_full_behavior == OnFullBehavior.OVERWRITE # This push triggers the instantaneous transfer. self.__output_queue.push(payload, overwrite=should_overwrite) if self.__part_wakeup_signal: self.__part_wakeup_signal.set() except EventQueue.FullError as e: if self.__on_full_behavior == OnFullBehavior.DROP: # Policy is to drop the event. Log it and continue. Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_EMIT_SKIPPED, {"reason": conf.LOG_DETAIL_REASON_TARGET_FULL, "payload": payload}) elif self.__on_full_behavior == OnFullBehavior.FAIL: # Policy is to fail. Propagate the exception up to the # run loop's main exception handler, which will stop the thread. raise e # If policy is OVERWRITE, the exception is handled inside push() and not raised. finally: # After a broadcast from a source, the event is always consumed # from the source queue, even if the transfer failed. This prevents # the downstream part from re-triggering the transfer. self.__output_queue.clear()
def _register_wakeup_signal(self, part: Part): """ Links this source to a part's wakeup signal. Args: part (Part): The part whose wakeup signal should be set on emit. """ self.__part_wakeup_signal = part._get_wakeup_signal()
[docs] class Interface: """Represents a connection between two ports, a master and a slave.""" def __init__(self, master_port: Port, slave_port: Port): """ Initializes an Interface. Args: master_port: The source port for the data transfer. slave_port: The destination port for the data transfer. Raises: ValueError: If the port connection is invalid according to the framework rules. """ if not (master_port.get_direction() == Port.OUT and slave_port.get_direction() == Port.IN) and \ not (master_port.get_direction() == slave_port.get_direction()): raise ValueError(conf.ERROR_INVALID_INTERFACE_CONNECTION.format( master_port.get_direction(), slave_port.get_direction())) slave_port._set_connected_as_slave() self.__master_port = master_port self.__slave_port = slave_port
[docs] def get_identifier(self) -> str: """ Gets the identifier for this interface. Returns: str: The fully qualified connection string. """ return self.get_full_identifier()
[docs] def get_full_identifier(self) -> str: """ Dynamically generates the identifier string for this interface. Returns: str: The fully qualified connection string (e.g., "part.out->part2.in"). """ return conf.CONNECTION_FORMAT.format( master_id=self.__master_port.get_full_identifier(), slave_id=self.__slave_port.get_full_identifier() )
[docs] def get_master_port(self) -> Port: """ Gets the master (source) port of the interface. Returns: Port: The master port instance. """ return self.__master_port
[docs] def get_slave_port(self) -> Port: """ Gets the slave (destination) port of the interface. Returns: Port: The slave port instance. """ return self.__slave_port
def _transfer(self): """ Transfers payload from the master port to the slave port if updated. NOTE: This method does NOT clear the master port's update flag. The caller is responsible for clearing the flag after all transfers for a given port are complete to enable one-to-many connections. """ if self.__master_port.is_updated(): payload = self.__master_port.get() Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_TRANSFER, {"payload": payload}) self.__slave_port.set(payload)
[docs] class EventInterface: """Represents a connection between two event queues.""" def __init__(self, master_queue: EventQueue, slave_queue: EventQueue): """ Initializes an EventInterface. Args: master_queue: The source queue for the event transfer. slave_queue: The destination queue for the event transfer. Raises: ValueError: If the queue connection is invalid. """ if not (master_queue.get_direction() == EventQueue.OUT and slave_queue.get_direction() == EventQueue.IN) and \ not (master_queue.get_direction() == EventQueue.IN and slave_queue.get_direction() == EventQueue.IN): raise ValueError(conf.ERROR_INVALID_EVENT_INTERFACE_CONNECTION.format( master_queue.get_direction(), slave_queue.get_direction())) master_queue._set_connected_as_master() slave_queue._set_connected_as_slave() self.__master_queue = master_queue self.__slave_queue = slave_queue # Register this interface with the master queue so it can trigger transfers. self.__master_queue._add_interface(self)
[docs] def get_identifier(self) -> str: """ Gets the identifier for this event interface. Returns: str: The fully qualified connection string. """ return self.get_full_identifier()
[docs] def get_full_identifier(self) -> str: """ Dynamically generates the identifier string for this event interface. Returns: str: The fully qualified connection string (e.g., "part.q_out->part2.q_in"). """ return conf.CONNECTION_FORMAT.format( master_id=self.__master_queue.get_full_identifier(), slave_id=self.__slave_queue.get_full_identifier() )
def _transfer(self, overwrite: bool = False): """ Transfers an event from the master to the slave queue. - For OUT -> IN (broadcast), the event is copied. - For IN -> IN (routing), the event is moved. Args: overwrite (bool): If True, the transfer to the slave queue will overwrite an existing item if the queue is full. This is only applicable for OUT -> IN transfers. """ if self.__master_queue.is_empty(): return # Broadcast from a source (OUT->IN) is a non-destructive copy. # The EventSource is responsible for clearing its queue after the emit. if self.__master_queue.get_direction() == EventQueue.OUT: payload = self.__master_queue.peek() Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_TRANSFER_BROADCAST, {"payload": payload}) # By removing the is_full() check, this line is now allowed to fail # and raise an EventQueue.FullError, which will be caught by the # EventSource's robust exception handler. self.__slave_queue.push(payload, overwrite=overwrite) # Routing between parts (IN->IN) is a destructive move. else: if self.__slave_queue.is_full(): Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_TRANSFER_SKIPPED, {"reason": conf.LOG_DETAIL_REASON_SLAVE_FULL}) return # The event remains in the master queue and the transfer is skipped. payload = self.__master_queue.pop() Tracer.log(conf.LOG_LEVEL_DATAFLOW, self.get_full_identifier(), conf.LOG_EVENT_TRANSFER_ROUTE, {"payload": payload}) self.__slave_queue.push(payload)
[docs] class Part(ABC): """ Represents a component in the system. A Part can be structural (composed of other parts) or behavioral (defined by a Python method). """ STRUCTURAL = conf.COMPONENT_TYPE_STRUCTURAL BEHAVIORAL = conf.COMPONENT_TYPE_BEHAVIORAL
[docs] class NotRunnableError(Exception): """ Exception raised when the main run loop is attempted on a behavioral part. The simulation loop can only be started on structural parts. """ pass
[docs] class BehaviorNotAllowedError(Exception): """Exception raised when behavior() is called on a structural part.""" pass
[docs] def get_identifier(self) -> str: """ Gets the local identifier of the part. Returns: str: The local identifier of the part. """ return self.__identifier
[docs] def get_full_identifier(self) -> str: """ Gets the fully qualified identifier of the part. Returns: str: The fully qualified identifier, including parent hierarchy. """ return self.__full_identifier
def __init__(self, identifier: str, execution_strategy: Optional[Callable[[Set[Part]], None]] = None, ports: Optional[List[Port]] = None, parts: Optional[Dict[str, Part]] = None, event_queues: Optional[List[EventQueue]] = None, scheduling_condition: Optional[Callable[[Part], bool]] = None): """Initializes a Part. Args: identifier: A unique name for the part. execution_strategy: The strategy to execute inner parts (for structural parts). ports: A list of ports for this part. parts: A dictionary of inner parts, keyed by their identifier. event_queues: A list of event queues for this part. scheduling_condition: An optional callable that returns True if the part should be scheduled for execution. If not provided, a default condition is used. Raises: ValueError: If the configuration is inconsistent (e.g., providing an execution_strategy for a behavioral part, or not providing one for a structural part). """ self.__identifier = identifier self.__full_identifier: str = self.__identifier self.__parent: Optional[Part] = None self.__wakeup_signal = threading.Event() self.__thread: Optional[threading.Thread] = None self.__exception: Optional[Exception] = None self.__creation_logged = False self.__scheduling_condition = scheduling_condition self.__hooks = {'init': [], 'term': []} self.__ports = {} if ports: for p in ports: self.__ports[p.get_identifier()] = p p._set_parent(self) self.__parts = {} if parts: self.__parts = parts for p in self.__parts.values(): p._set_parent(self) self.__interfaces: List[Interface] = [] self.__event_interfaces: List[EventInterface] = [] self.__event_queues = {} if event_queues: for q in event_queues: if q.get_direction() != EventQueue.IN: raise ValueError(conf.ERROR_PART_CAN_ONLY_HAVE_INPUT_QUEUES.format(self.get_identifier())) self.__event_queues[q.get_identifier()] = q q._set_parent(self) if parts: self.__description = Part.STRUCTURAL if type(self).behavior is not Part.behavior: raise TypeError(conf.ERROR_STRUCTURAL_PART_OVERRIDES_BEHAVIOR.format(identifier)) if not execution_strategy: raise ValueError(conf.ERROR_STRUCTURAL_PART_MISSING_EXECUTION_STRATEGY) self.__execution_strategy = execution_strategy self.__is_running = False else: self.__description = Part.BEHAVIORAL if execution_strategy: raise ValueError(conf.ERROR_BEHAVIORAL_PART_WITH_EXECUTION_STRATEGY) self.__execution_strategy = None def _set_parent(self, parent: Part): """ Sets the parent component and trigger a cascading name update. This is called during the parent's initialization to establish the hierarchical structure and enable full identifier generation. Args: parent (Part): The parent Part object. """ if self.__parent: return # Parent already set. self.__parent = parent self._update_full_identifier() def _update_full_identifier(self): """Updates the full identifier and cascade the update to children.""" if self.__parent: self.__full_identifier = f"{self.__parent.get_full_identifier()}.{self.get_identifier()}" for component in list(self.__ports.values()) + list(self.__parts.values()) + list(self.__event_queues.values()): component._update_full_identifier() def _log_creation(self): """Logs the creation of this part and cascades the call to its children.""" Tracer.log(conf.LOG_LEVEL_CREATE, self.get_full_identifier(), conf.LOG_EVENT_CREATE, {"type": self.__description}) # Log child components first for a more intuitive order in the log file. for component in list(self.__ports.values()) + list(self.__event_queues.values()): component._log_creation() for part in self.__parts.values(): part._log_creation() def _transfer_and_clear_ports(self): """ Handles the one-to-many transfer logic for all data ports. It transfers data from all updated master ports to their slaves, and then clears the update flags on those master ports. """ Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_TRANSFER_PORTS) master_ports_to_interfaces = {} for interface in self.__interfaces: master_port = interface.get_master_port() if master_port not in master_ports_to_interfaces: master_ports_to_interfaces[master_port] = [] master_ports_to_interfaces[master_port].append(interface) for master_port, interfaces in master_ports_to_interfaces.items(): if master_port.is_updated(): # Transfer to all slave ports first for interface in interfaces: interface._transfer() # Then clear the master flag once all slaves have the data master_port._clear_update_flag() def _get_wakeup_signal(self) -> threading.Event: """ Retrieves the part's threading.Event object. Returns: threading.Event: The event object used to wake up the part's run loop. """ return self.__wakeup_signal
[docs] def connect(self, master_port: Port, slave_port: Port): """ Connects two ports, creating an interface. Args: master_port (Port): The source port for the data transfer. slave_port (Port): The destination port for the data transfer. """ interface = Interface(master_port, slave_port) Tracer.log(conf.LOG_LEVEL_CONNECT, self.get_full_identifier(), conf.LOG_EVENT_CONNECT_PORT, {"master": master_port.get_full_identifier(), "slave": slave_port.get_full_identifier()}) self.__interfaces.append(interface)
[docs] def connect_event_source(self, source: EventSource, target_queue_identifier: str): """ Connects an external event source to one of this part's input queues. This establishes both the data path (via an EventInterface) and the synchronization signal for waking up the part's run loop. Args: source (EventSource): The EventSource to connect. target_queue_identifier (str): The identifier of the target input queue on this part. """ target_queue = self.get_event_queue(target_queue_identifier) source._register_wakeup_signal(self) self.connect_event_queue(source._get_output_queue(), target_queue) Tracer.log(conf.LOG_LEVEL_CONNECT, self.get_full_identifier(), conf.LOG_EVENT_CONNECT_EVENT_SRC, {"source": source.get_full_identifier(), "target_queue": target_queue.get_full_identifier()})
[docs] def connect_event_queue(self, master_queue: EventQueue, slave_queue: EventQueue): """ Connects an event source or a parent queue to a child queue. Args: master_queue (EventQueue): The source queue for the event transfer. slave_queue (EventQueue): The destination queue for the event transfer. """ # The slave queue must belong either to this part or one of its direct inner parts. is_own_queue = slave_queue in self.__event_queues.values() is_inner_part_queue = False if not is_own_queue: for part in self.__parts.values(): try: # Check if the inner part has the queue and it's the correct object if part.get_event_queue(slave_queue.get_identifier()) is slave_queue: is_inner_part_queue = True break except KeyError: # This inner part doesn't have a queue with that name, so we continue continue if not is_own_queue and not is_inner_part_queue: raise ValueError(conf.ERROR_SLAVE_EVENT_QUEUE_NOT_IN_PART) interface = EventInterface(master_queue, slave_queue) Tracer.log(conf.LOG_LEVEL_CONNECT, self.get_full_identifier(), conf.LOG_EVENT_CONNECT_EVENT_Q, {"master": master_queue.get_full_identifier(), "slave": slave_queue.get_full_identifier()}) self.__event_interfaces.append(interface)
[docs] def get_event_queue(self, identifier: str) -> EventQueue: """ Retrieves an event queue by its identifier. Args: identifier (str): The name of the event queue to retrieve. Returns: EventQueue: The requested event queue instance. """ try: return self.__event_queues[identifier] except KeyError: raise KeyError(conf.ERROR_EVENT_QUEUE_NOT_FOUND.format(identifier, self.get_full_identifier()))
[docs] def get_port(self, identifier: str) -> Port: """ Retrieves a port by its identifier. Args: identifier (str): The name of the port to retrieve. Returns: Port: The requested port instance. """ try: return self.__ports[identifier] except KeyError: raise KeyError(conf.ERROR_PORT_NOT_FOUND.format(identifier, self.get_full_identifier()))
[docs] def get_ports(self, direction: str) -> List[Port]: """ Retrieves all ports of a given direction. Args: direction (str): The direction of ports to retrieve (Port.IN or Port.OUT). Returns: List[Port]: A list of ports matching the given direction. """ if direction not in [Port.IN, Port.OUT]: raise ValueError(conf.ERROR_INVALID_PORT_DIRECTION.format(direction)) return [p for p in self.__ports.values() if p.get_direction() == direction]
[docs] def get_part(self, identifier: str) -> Part: """ Retrieves an inner part by its identifier. Args: identifier (str): The name of the inner part to retrieve. Returns: Part: The requested inner part instance. """ try: return self.__parts[identifier] except KeyError: raise KeyError(conf.ERROR_PART_NOT_FOUND.format(identifier, self.get_full_identifier()))
[docs] def get_parts(self) -> List[Part]: """ Retrieves all inner parts of this structural part. Returns: List[Part]: A list of all inner part instances. """ return list(self.__parts.values())
[docs] def get_event_queues(self) -> List[EventQueue]: """ Retrieves all event queues of this part. Returns: List[EventQueue]: A list of all event queue instances belonging to this part. """ return list(self.__event_queues.values())
[docs] def get_interfaces(self) -> List[Interface]: """ Retrieves all data port interfaces (connections) of this structural part. Returns: List[Interface]: A list of all interface instances. """ return self.__interfaces
[docs] def get_exception(self) -> Optional[Exception]: """ Gets the exception that caused the run loop to terminate, if any. Returns: Optional[Exception]: The exception object if one occurred, otherwise None. """ return self.__exception
[docs] def add_hook(self, hook_type: str, func: Callable): """ Registers a function to be called at a specific lifecycle event. This is useful for setting up or tearing down resources that are external to the simulation framework's core logic, such as connecting to a physics engine or closing plot windows. Args: hook_type: The type of hook, either 'init' or 'term'. func: The function to register. """ if hook_type not in self.__hooks: raise ValueError(conf.ERROR_INVALID_HOOK_TYPE.format(hook_type)) Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_ADD_HOOK, {"type": hook_type, "function": func.__name__}) self.__hooks[hook_type].append(func)
[docs] def init(self): """ Recursively initializes the part and all its sub-parts. This method should be called once on the top-level part before starting the simulation. It executes all registered 'init' hooks in a top-down manner. """ Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_INIT_START) for hook in self.__hooks['init']: hook() for part in self.__parts.values(): part.init() Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_INIT_END)
[docs] def term(self): """ Recursively terminates the part and all its sub-parts. This method should be called once on the top-level part after the simulation has finished. It executes all registered 'term' hooks in a top-down manner. """ Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_TERM_START) for hook in self.__hooks['term']: hook() for part in self.__parts.values(): part.term() Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_TERM_END)
[docs] def behavior(self): """ Defines the behavior of the part. This method should be overridden by subclasses of behavioral parts. Raises: BehaviorNotAllowedError: If this part is structural. """ if self.__description == Part.STRUCTURAL: raise self.BehaviorNotAllowedError(conf.ERROR_BEHAVIOR_ON_STRUCTURAL_PART)
# This method is intended to be overridden by behavioral part subclasses. # The base implementation does nothing.
[docs] def trace_log(self, message: str, details: Optional[dict] = None): """ Logs a custom message to the global tracer from within a part's behavior. This is the recommended way for user-defined parts to create trace output. Args: message (str): The string message to log. details (Optional[dict]): An optional dictionary of key-value pairs for structured data. """ Tracer.log(LogLevel.INFO, self.get_full_identifier(), conf.LOG_EVENT_USER_LOG, {"message": message, **(details or {})})
def __wait_for_events(self): """ Blocks execution until an event is present in an input queue. If an event is already in one of the queues, it returns immediately. Otherwise, it waits for a signal from a connected EventSource. """ # First, check if an event is already present. for queue in self.get_event_queues(): if not queue.is_empty(): Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_WAIT_SKIPPED) return # Event found, no need to wait. # If no events are present, wait for the signal. Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_WAIT_START) self.__wakeup_signal.wait() self.__wakeup_signal.clear() # Reset for the next wait cycle. Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_WAIT_END)
[docs] def start(self, stop_condition: Callable[[Part], bool]): """ Starts the part's run loop in a new thread. This method is only applicable to structural parts. Args: stop_condition: A callable that returns True to stop the loop. Raises: NotRunnableError: If called on a behavioral part. RuntimeError: If the part is already running. """ if self.__description != Part.STRUCTURAL: raise self.NotRunnableError(conf.ERROR_START_ON_BEHAVIORAL_PART) if self.__is_running and self.__thread and self.__thread.is_alive(): raise RuntimeError(conf.ERROR_PART_ALREADY_RUNNING.format(self.get_full_identifier())) if not self.__creation_logged: self._log_creation() self.__creation_logged = True self.__is_running = True self.__thread = threading.Thread(target=self._run, args=(stop_condition,), name=f"{self.get_full_identifier()}_thread") self.__thread.start()
[docs] def join(self, timeout: Optional[float] = None): """ Waits for the part's run thread to complete. Args: timeout (Optional[float]): The maximum time in seconds to wait for the thread to complete. """ if self.__thread: self.__thread.join(timeout) self.__thread = None self.__is_running = False
def _is_runnable(self) -> bool: """ Checks if the part is ready to be executed by its parent. If a `scheduling_condition` was provided during initialization, it is used. Otherwise, the default behavior is to consider the part runnable if any of its input ports have been updated or any of its input event queues contain an event. Returns: bool: True if the part should be scheduled for execution, False otherwise. """ if self.__scheduling_condition: # If a specific condition is provided, use it regardless of part type. return self.__scheduling_condition(self) # Default scheduling condition: runnable if any input is present. if any(p.is_updated() for p in self.get_ports(Port.IN)): return True if any(not q.is_empty() for q in self.get_event_queues()): return True return False def __step(self) -> bool: """ Performs one cycle of scheduling and execution of inner parts. This method iteratively schedules and executes children as long as there is data flowing between them, ensuring that dependencies are resolved within a single step of the parent part. Returns: bool: True if the system was idle (no parts were scheduled), False otherwise. """ Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_STEP_START) has_executed_anything_in_this_step = False while True: # Determine which children need to be executed in this sub-step. to_be_scheduled = {child for child in self.get_parts() if child._is_runnable()} if not to_be_scheduled: break has_executed_anything_in_this_step = True scheduled_part_ids = {p.get_full_identifier() for p in to_be_scheduled} Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_STEP_SCHEDULE, {"scheduled": sorted(list(scheduled_part_ids))}) self.__execution_strategy(to_be_scheduled) self._transfer_and_clear_ports() Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_STEP_END, {"idle": not has_executed_anything_in_this_step}) return not has_executed_anything_in_this_step def _run(self, stop_condition: Callable[[Part], bool]): """ The internal execution loop for a structural part, run in a separate thread. This method is started via the public `start()` method and should not be called directly. Args: stop_condition (Callable[[Part], bool]): A callable that takes the part instance and returns True to stop the loop. """ Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_RUN_LOOP_START) try: if self.__description != Part.STRUCTURAL: raise self.NotRunnableError(conf.ERROR__RUN_ON_BEHAVIORAL_PART) while True: if stop_condition(self): Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_STOP_CONDITION_MET) break is_idle = self.__step() if is_idle: # No parts were scheduled, check if we should terminate or wait for events if stop_condition(self): Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_STOP_CONDITION_MET_IDLE) break else: self.__wait_for_events() # After waking up, transfer the captured events to inner parts Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_WAKEUP_TRANSFER) for event_interface in self.__event_interfaces: event_interface._transfer() except Exception as e: # Catches all exceptions from the execution loop tb_str = traceback.format_exc() Tracer.log(LogLevel.ERROR, self.get_full_identifier(), conf.LOG_EVENT_EXCEPTION, {"error": str(e), "traceback": tb_str}) self.__exception = e # Do not re-raise, to be consistent with EventSource exception handling. Tracer.log(conf.LOG_LEVEL_LIFECYCLE, self.get_full_identifier(), conf.LOG_EVENT_RUN_LOOP_END)
[docs] def execute(self): """ Executes a single cycle of the part. For structural parts, it propagates inputs and runs the inner loop once. For behavioral parts, it calls the behavior() method. """ Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_EXECUTE_START) if self.__description == Part.STRUCTURAL: # For a structural part, execution involves three stages: # 1. Propagate incoming events from parent queues to child queues. for event_interface in self.__event_interfaces: event_interface._transfer() # 2. Propagate data from this part's own input ports to its children's ports. # This handles the IN -> IN connections at the boundary. for port in self.get_ports(Port.IN): if port.is_updated(): # Find all interfaces starting from this port and transfer. for interface in self.__interfaces: if interface.get_master_port() is port: interface._transfer() # After propagating to all children, clear the flag on this input port # to signify it has been consumed at this level. port._clear_update_flag() # 3. Run one cycle of its internal scheduling and execution. self.__step() else: # Behavioral try: self.behavior() finally: # After a behavioral part's behavior() method completes, its # inputs are considered consumed. The framework clears the flags # on all input ports to prevent the part from being re-scheduled # infinitely on the same data. for port in self.get_ports(Port.IN): if port.is_updated(): port._clear_update_flag() Tracer.log(conf.LOG_LEVEL_EXECUTION, self.get_full_identifier(), conf.LOG_EVENT_EXECUTE_END)