Engine API

This is the core simulation engine.

class ml.engine.EventInterface(master_queue: EventQueue, slave_queue: EventQueue)[source]

Bases: object

Represents a connection between two event queues.

get_full_identifier() str[source]

Dynamically generates the identifier string for this event interface.

Returns:

The fully qualified connection string (e.g., “part.q_out->part2.q_in”).

Return type:

str

get_identifier() str[source]

Gets the identifier for this event interface.

Returns:

The fully qualified connection string.

Return type:

str

class ml.engine.EventQueue(identifier: str, direction: str, size: int | None = None, mode: QueueMode = QueueMode.FIFO)[source]

Bases: object

Represents a queue for storing event payloads, with different behaviors for input and output directions.

exception EmptyError[source]

Bases: Exception

Exception raised when pop() is called on an empty EventQueue.

exception FullError[source]

Bases: Exception

Exception raised when push() is called on a full EventQueue.

IN = 'input'
OUT = 'output'
Parent = typing.Any
clear()[source]

Removes all event payloads from the queue.

get_direction() str[source]

Gets the direction of the event queue.

Returns:

The direction of the event queue (e.g., ‘input’ or ‘output’).

Return type:

str

get_full_identifier() str[source]

Gets the fully qualified identifier of the event queue.

Returns:

The fully qualified identifier, including parent hierarchy.

Return type:

str

get_identifier() str[source]

Gets the local identifier of the event queue.

Returns:

The local identifier of the event queue.

Return type:

str

is_empty() bool[source]

Checks if the event queue is empty.

Returns:

True if the queue has no items, False otherwise.

Return type:

bool

is_full() bool[source]

Checks if the event queue is full.

Returns:

True if the queue has reached its capacity, False otherwise.

Return type:

bool

peek() Any | None[source]

Gets the payload from an OUT queue without consuming it.

Returns:

The payload if the queue is an OUT queue and not empty,

otherwise None.

Return type:

Optional[Any]

pop() Any | None[source]

Removes and returns an event payload from an IN queue.

The item returned depends on the queue’s mode (FIFO or LIFO). For OUT queues, this is a destructive read of the single payload.

Returns:

The payload removed from the queue.

Return type:

Optional[Any]

push(payload: Any, overwrite: bool = False)[source]

Adds an event payload to the queue based on its direction.

Parameters:
  • payload (Any) – The event payload to add.

  • overwrite (bool) – For IN queues, if True, this will replace an existing item if the queue is full.

class ml.engine.EventSource(identifier: str, on_full: OnFullBehavior = OnFullBehavior.FAIL)[source]

Bases: ABC

Abstract base class for components that generate events, typically in a separate thread.

This class provides the core infrastructure for starting, stopping, and managing the lifecycle of an event source thread. Subclasses must implement the _run_loop method, which contains the logic for generating events.

emit(payload: Any)[source]

Emits an event with a given payload.

The output queue will handle the immediate transfer, and this method will signal any waiting Part. The output queue is cleared after the attempt, regardless of success.

Parameters:

payload (Any) – The payload of the event to emit.

get_exception() Exception | None[source]

Gets the exception that caused the run loop to terminate, if any.

Returns:

The exception object if one occurred, otherwise None.

Return type:

Optional[Exception]

get_full_identifier() str[source]

Gets the fully qualified identifier of the event source.

Returns:

The fully qualified identifier.

Return type:

str

get_identifier() str[source]

Gets the local identifier of the event source.

Returns:

The local identifier of the event source.

Return type:

str

is_running() bool[source]

Checks if the event source thread is running.

Returns:

True if the source is running, False otherwise.

Return type:

bool

join(timeout: float | None = None)[source]

Waits for the event source thread to finish.

start()[source]

Starts the event source in a new thread.

stop()[source]

Stops the event source thread and signals any listeners.

stop_event_is_set() bool[source]

Checks if the stop event has been set.

Returns:

True if the stop event is set, False otherwise.

Return type:

bool

class ml.engine.Interface(master_port: Port, slave_port: Port)[source]

Bases: object

Represents a connection between two ports, a master and a slave.

get_full_identifier() str[source]

Dynamically generates the identifier string for this interface.

Returns:

The fully qualified connection string (e.g., “part.out->part2.in”).

Return type:

str

get_identifier() str[source]

Gets the identifier for this interface.

Returns:

The fully qualified connection string.

Return type:

str

get_master_port() Port[source]

Gets the master (source) port of the interface.

Returns:

The master port instance.

Return type:

Port

get_slave_port() Port[source]

Gets the slave (destination) port of the interface.

Returns:

The slave port instance.

Return type:

Port

class ml.engine.Part(identifier: str, execution_strategy: Callable[[Set[Part]], None] | None = None, ports: List[Port] | None = None, parts: Dict[str, Part] | None = None, event_queues: List[EventQueue] | None = None, scheduling_condition: Callable[[Part], bool] | None = None)[source]

Bases: ABC

Represents a component in the system.

A Part can be structural (composed of other parts) or behavioral (defined by a Python method).

BEHAVIORAL = 'behavioral'
exception BehaviorNotAllowedError[source]

Bases: Exception

Exception raised when behavior() is called on a structural part.

exception NotRunnableError[source]

Bases: Exception

Exception raised when the main run loop is attempted on a behavioral part. The simulation loop can only be started on structural parts.

STRUCTURAL = 'structural'
add_hook(hook_type: str, func: Callable)[source]

Registers a function to be called at a specific lifecycle event.

This is useful for setting up or tearing down resources that are external to the simulation framework’s core logic, such as connecting to a physics engine or closing plot windows.

Parameters:
  • hook_type – The type of hook, either ‘init’ or ‘term’.

  • func – The function to register.

behavior()[source]

Defines the behavior of the part.

This method should be overridden by subclasses of behavioral parts.

Raises:

BehaviorNotAllowedError – If this part is structural.

connect(master_port: Port, slave_port: Port)[source]

Connects two ports, creating an interface.

Parameters:
  • master_port (Port) – The source port for the data transfer.

  • slave_port (Port) – The destination port for the data transfer.

connect_event_queue(master_queue: EventQueue, slave_queue: EventQueue)[source]

Connects an event source or a parent queue to a child queue.

Parameters:
  • master_queue (EventQueue) – The source queue for the event transfer.

  • slave_queue (EventQueue) – The destination queue for the event transfer.

connect_event_source(source: EventSource, target_queue_identifier: str)[source]

Connects an external event source to one of this part’s input queues.

This establishes both the data path (via an EventInterface) and the synchronization signal for waking up the part’s run loop.

Parameters:
  • source (EventSource) – The EventSource to connect.

  • target_queue_identifier (str) – The identifier of the target input queue on this part.

execute()[source]

Executes a single cycle of the part. For structural parts, it propagates inputs and runs the inner loop once. For behavioral parts, it calls the behavior() method.

get_event_queue(identifier: str) EventQueue[source]

Retrieves an event queue by its identifier.

Parameters:

identifier (str) – The name of the event queue to retrieve.

Returns:

The requested event queue instance.

Return type:

EventQueue

get_event_queues() List[EventQueue][source]

Retrieves all event queues of this part.

Returns:

A list of all event queue instances belonging to this part.

Return type:

List[EventQueue]

get_exception() Exception | None[source]

Gets the exception that caused the run loop to terminate, if any.

Returns:

The exception object if one occurred, otherwise None.

Return type:

Optional[Exception]

get_full_identifier() str[source]

Gets the fully qualified identifier of the part.

Returns:

The fully qualified identifier, including parent hierarchy.

Return type:

str

get_identifier() str[source]

Gets the local identifier of the part.

Returns:

The local identifier of the part.

Return type:

str

get_interfaces() List[Interface][source]

Retrieves all data port interfaces (connections) of this structural part.

Returns:

A list of all interface instances.

Return type:

List[Interface]

get_part(identifier: str) Part[source]

Retrieves an inner part by its identifier.

Parameters:

identifier (str) – The name of the inner part to retrieve.

Returns:

The requested inner part instance.

Return type:

Part

get_parts() List[Part][source]

Retrieves all inner parts of this structural part.

Returns:

A list of all inner part instances.

Return type:

List[Part]

get_port(identifier: str) Port[source]

Retrieves a port by its identifier.

Parameters:

identifier (str) – The name of the port to retrieve.

Returns:

The requested port instance.

Return type:

Port

get_ports(direction: str) List[Port][source]

Retrieves all ports of a given direction.

Parameters:

direction (str) – The direction of ports to retrieve (Port.IN or Port.OUT).

Returns:

A list of ports matching the given direction.

Return type:

List[Port]

init()[source]

Recursively initializes the part and all its sub-parts.

This method should be called once on the top-level part before starting the simulation. It executes all registered ‘init’ hooks in a top-down manner.

join(timeout: float | None = None)[source]

Waits for the part’s run thread to complete.

Parameters:

timeout (Optional[float]) – The maximum time in seconds to wait for the thread to complete.

start(stop_condition: Callable[[Part], bool])[source]

Starts the part’s run loop in a new thread.

This method is only applicable to structural parts.

Parameters:

stop_condition – A callable that returns True to stop the loop.

Raises:
  • NotRunnableError – If called on a behavioral part.

  • RuntimeError – If the part is already running.

term()[source]

Recursively terminates the part and all its sub-parts.

This method should be called once on the top-level part after the simulation has finished. It executes all registered ‘term’ hooks in a top-down manner.

trace_log(message: str, details: dict | None = None)[source]

Logs a custom message to the global tracer from within a part’s behavior.

This is the recommended way for user-defined parts to create trace output.

Parameters:
  • message (str) – The string message to log.

  • details (Optional[dict]) – An optional dictionary of key-value pairs for structured data.

class ml.engine.Port(identifier: str, direction: str)[source]

Bases: object

Represents an input or output port for a Part.

IN = 'input'
OUT = 'output'
exception OverwriteError[source]

Bases: Exception

Exception raised when set() is called on an already updated Port.

Parent = typing.Any
exception StaleReadError[source]

Bases: Exception

Exception raised when get() is called on a port that has not been updated.

get() Any[source]

Retrieves the payload from the port. This is a non-destructive read; the ‘updated’ flag is cleared by the engine after the owning part’s execute() method has completed for the current step.

Returns:

The payload currently held by the port.

Return type:

Any

get_direction() str[source]

Gets the direction of the port.

Returns:

The direction of the port (e.g., ‘input’ or ‘output’).

Return type:

str

get_full_identifier() str[source]

Gets the fully qualified identifier of the port.

Returns:

The fully qualified identifier, including parent hierarchy.

Return type:

str

get_identifier() str[source]

Gets the local identifier of the port.

Returns:

The local identifier of the port.

Return type:

str

get_parent() Parent | None[source]

Gets the parent component of the port.

Returns:

The parent object (typically a Part), or None if not set.

Return type:

Optional[Parent]

is_updated() bool[source]

Checks if the port’s payload has been updated since the last clear.

Returns:

True if the port holds new data, False otherwise.

Return type:

bool

set(payload: Any)[source]

Sets the payload of the port and marks it as updated.

This method is called by the framework during data transfers and should be called by the user inside a behavior() method to set the value of an output port.

Parameters:

payload – The new payload for the port.

ml.engine.sequential_execution(scheduled_parts: Set[Part])[source]

A simple execution strategy that executes scheduled parts sequentially.

It iterates through the provided set of parts and calls the execute() method on each one. The order of execution is not guaranteed due to the nature of sets.

Parameters:

scheduled_parts – A set of parts that have been scheduled for execution.