Engine API
This is the core simulation engine.
- class ml.engine.Blueprint(part_to_be_proxied: Part)
Bases:
PartA smart proxy for a Part that runs in a separate process.
This class inherits from Part to provide a consistent interface to its parent. Internally, it manages the lifecycle and communication (via queues) of a worker process that runs the real Part instance.
- execute()
Handles the full IPC cycle for executing the real part in the worker process.
This method gathers input data from its own updated ports, sends an ‘EXECUTE’ command to the worker, blocks until the execution is complete, and then applies the returned output data to its own output ports.
- Raises:
Exception – If the worker process raises an exception during execution.
Port.PayloadError – If the worker output data type does not match the blueprint port type.
Port.OverwriteError – If a data transfer attempts to overwrite an unconsumed port payload.
- get_command_queue() Queue
Returns the queue used to send commands to the worker process.
- get_result_queue() Queue
Returns the queue used to receive results from the worker process.
- init()
No-op init. The real part’s init() is called inside the worker process. This is called by the parent’s init() cascade.
- start(stop_condition: Callable[[Part], bool])
Creates and starts the worker process for the real Part.
This method is called by the parent’s start() cascade. It configures and launches a new multiprocessing.Process with worker_process_loop as its target.
- Parameters:
stop_condition – This argument is part of the method signature to conform to the parent Part.start method but is not used by the Blueprint, as the stop condition is managed by the parent’s run loop.
- stop()
Sends the SHUTDOWN command to the worker process.
- term()
Sends the SHUTDOWN command and joins the worker process.
- wait_for_ready()
Blocks until the worker process signals it has initialized.
- static worker_process_loop(part_info: dict, part_full_id_for_logging: str, command_queue: Queue, result_queue: Queue, log_queue: Queue | None, error_queue: Queue | None)
The main, persistent loop for a worker process.
This static method is the target function for the multiprocessing.Process. It handles the entire lifecycle of the real Part within the new process, including initialization, command processing, execution, and termination.
- Parameters:
part_info – A dictionary containing metadata to reconstruct the real Part.
part_full_id_for_logging – The full identifier for logging purposes.
command_queue – The queue for receiving commands (‘EXECUTE’, ‘SHUTDOWN’) from the main process.
result_queue – The queue for sending results (‘WORKER_READY’, ‘EXECUTION_COMPLETE’, exceptions) back to the main process.
log_queue – The queue for sending structured log records to the main process’s tracer.
error_queue – The queue for sending formatted error strings to the main process’s central error handler.
- class ml.engine.EventInterface(master_queue: EventQueue, slave_queue: EventQueue)
Bases:
objectRepresents a connection between two event queues.
- exception ValidationError
Bases:
ExceptionException raised when an event queue connection fails validation.
- get_full_identifier() str
Dynamically generates the identifier string for this event interface.
- Returns:
The fully qualified connection string (e.g., “part.q_out->part2.q_in”).
- Return type:
str
- get_identifier() str
Gets the identifier for this event interface.
- Returns:
The fully qualified connection string.
- Return type:
str
- class ml.engine.EventQueue(identifier: str, direction: str, size: int | None = None, mode: QueueMode = QueueMode.FIFO, type: Type | None = None, payload_check: bool = False, conf: Any | None = None)
Bases:
objectRepresents a queue for storing event payloads, with different behaviors for input and output directions.
Methods meant to be overridden:
validate_connection: Validates if this queue can be connected to a specific slave queue.
- exception EmptyError
Bases:
ExceptionException raised when pop() is called on an empty EventQueue.
- exception FullError
Bases:
ExceptionException raised when push() is called on a full EventQueue.
- IN = 'input'
- OUT = 'output'
- Parent
alias of
Any
- exception PayloadError
Bases:
ExceptionException raised when push() is called with a payload of incorrect type.
- clear()
Removes all event payloads from the queue.
- get_direction() str
Gets the direction of the event queue.
- Returns:
The direction of the event queue (e.g., ‘input’ or ‘output’).
- Return type:
str
- get_full_identifier() str
Gets the fully qualified identifier of the event queue.
- Returns:
The fully qualified identifier, including parent hierarchy.
- Return type:
str
- get_identifier() str
Gets the local identifier of the event queue.
- Returns:
The local identifier of the event queue.
- Return type:
str
- get_type() Type | None
Gets the expected type of the payload.
- Returns:
The expected type, or None if no type has been specified.
- Return type:
Optional[Type]
- is_empty() bool
Checks if the event queue is empty.
- Returns:
True if the queue has no payloads, False otherwise.
- Return type:
bool
- is_full() bool
Checks if the event queue is full.
- Returns:
True if the queue has reached its capacity, False otherwise.
- Return type:
bool
- peek() Any | None
Gets the payload from an OUT queue without consuming it.
- Returns:
- The payload if the queue is an OUT queue and not empty,
otherwise None.
- Return type:
Optional[Any]
- pop() Any | None
Removes and returns an event payload from an IN queue.
The payload returned depends on the queue’s mode (FIFO or LIFO). For OUT queues, this is a destructive read of the single payload.
- Returns:
The payload removed from the queue.
- Return type:
Optional[Any]
- Raises:
EventQueue.EmptyError – If the queue is empty.
- push(payload: Any, overwrite: bool = False)
Adds an event payload to the queue based on its direction.
- Parameters:
payload – The event payload to add.
overwrite – For IN queues, if True, this will replace an existing payload if the queue is full.
- Raises:
EventQueue.FullError – If the queue is full and overwrite is False (for IN queues, or propagated from slave for OUT queues).
EventQueue.PayloadError – If the payload type does not match the queue’s declared type.
- validate_connection(slave_queue: EventQueue) bool
Validates if this queue can be connected to the given slave queue.
This method is meant to be overridden for custom validation logic. The default implementation checks if the master queue’s type is a subclass of the slave queue’s type, if both are specified.
- Parameters:
slave_queue – The slave queue to connect to.
- Returns:
True if the connection is valid, False otherwise.
- Return type:
bool
- class ml.engine.EventSource(identifier: str, on_full: OnFullBehavior = OnFullBehavior.FAIL, size: int | None = None, mode: QueueMode = QueueMode.FIFO, type: Type | None = None, payload_check: bool = False, conf: Any | None = None)
Bases:
ABCAbstract base class for components that generate events, typically in a separate thread.
This class provides the core infrastructure for starting, stopping, and managing the lifecycle of an event source thread.
Methods meant to be overridden:
run_loop: Contains the logic for generating events.
- emit(payload: Any)
Emits an event with a given payload.
The output queue will handle the immediate transfer, and this method will signal any waiting Part. The output queue is cleared after the attempt, regardless of success.
- Parameters:
payload – The payload of the event to emit.
- Raises:
EventQueue.FullError – If the target queue is full and the policy is FAIL.
EventQueue.PayloadError – If the payload type does not match the output queue’s declared type.
- get_exception() Exception | None
Gets the exception that caused the run loop to terminate, if any.
- Returns:
The exception object if one occurred, otherwise None.
- Return type:
Optional[Exception]
- get_full_identifier() str
Gets the fully qualified identifier of the event source.
- Returns:
The fully qualified identifier.
- Return type:
str
- get_identifier() str
Gets the local identifier of the event source.
- Returns:
The local identifier of the event source.
- Return type:
str
- is_running() bool
Checks if the event source thread is running.
- Returns:
True if the source is running, False otherwise.
- Return type:
bool
- abstractmethod run_loop()
The main loop for the event source. This method must be implemented by subclasses.
The loop should periodically check self.is_running() or self.__stop_event.is_set() and exit gracefully.
- start()
Starts the event source in a new thread.
- stop()
Stops the event source thread and signals any listeners.
- stop_event_is_set() bool
Checks if the stop event has been set.
- Returns:
True if the stop event is set, False otherwise.
- Return type:
bool
- wait(timeout: float | None = None)
Waits for the event source thread to finish.
- Parameters:
timeout – The maximum time in seconds to wait for the thread to complete.
- class ml.engine.Interface(master_port: Port, slave_port: Port)
Bases:
objectRepresents a connection between two ports, a master and a slave.
- exception ValidationError
Bases:
ExceptionException raised when a port connection fails validation.
- get_full_identifier() str
Dynamically generates the identifier string for this interface.
- Returns:
The fully qualified connection string (e.g., “part.out->part2.in”).
- Return type:
str
- get_identifier() str
Gets the identifier for this interface.
- Returns:
The fully qualified connection string.
- Return type:
str
- class ml.engine.Part(identifier: str, execution_strategy: Any | None = None, ports: List[Port] | None = None, parts: Dict[str, Part] | None = None, event_queues: List[EventQueue] | None = None, scheduling_condition: Callable[[...], bool] | None = None, scheduling_args: tuple | None = None, conf: Any | None = None, **kwargs)
Bases:
ABCRepresents a component in the system.
A Part can be structural (composed of other parts) or behavioral (defined by a Python method).
Methods meant to be overridden:
behavior: Defines the logic for behavioral parts.
configure: Configures the part and its children.
configure_item: Configures a Port or EventQueue before insertion.
validate_insertion: Validates if a Port or EventQueue can be added.
- BEHAVIORAL = 'behavioral'
- BLUEPRINT = 'blueprint'
- exception BehaviorNotAllowedError
Bases:
ExceptionException raised when behavior() is called on a structural part.
- exception NotExecutableError
Bases:
ExceptionException raised when the main run loop is attempted on a behavioral part. The loop can only be started on structural parts.
- STRUCTURAL = 'structural'
- exception ValidationError
Bases:
ExceptionException raised when an item (Port/EventQueue) fails validation during insertion.
- add_hook(hook_type: str, func: Callable)
Registers a function to be called at a specific lifecycle event.
This is useful for setting up or tearing down resources that are external to the framework’s core logic, such as connecting to a physics engine or closing plot windows.
- Parameters:
hook_type – The type of hook, either ‘init’ or ‘term’.
func – The function to register.
- Raises:
ValueError – If the hook type is invalid.
- add_part(part: Part)
Adds a child part to this structural part.
- Parameters:
part – The child part to add.
- Raises:
ValueError – If a part with the same identifier already exists in this part.
- behavior()
Defines the behavior of the part.
This method should be overridden by subclasses of behavioral parts.
- Raises:
BehaviorNotAllowedError – If this part is structural.
- configure(conf: Any | None)
Recursively configures the part and all its sub-parts.
The default behavior is to store the configuration object and pass it down to all children. This method can be overridden by subclasses for custom configuration logic.
- Parameters:
conf – The configuration object to apply.
- configure_item(item: Any)
Configures a Port or EventQueue before it is added to the part.
This method is meant to be overridden for custom configuration logic.
- Parameters:
item – The Port or EventQueue to configure.
- connect(master_port: Port, slave_port: Port)
Connects two ports, creating an interface.
- Parameters:
- Raises:
ValueError – If the port connection is invalid.
Interface.ValidationError – If the ports are incompatible.
- connect_event_queue(master_queue: EventQueue, slave_queue: EventQueue)
Connects an event source or a parent queue to a child queue.
- Parameters:
master_queue (EventQueue) – The source queue for the event transfer.
slave_queue (EventQueue) – The destination queue for the event transfer.
- Raises:
ValueError – If the slave queue does not belong to this part or its children, or if the connection is invalid.
EventInterface.ValidationError – If the queues are incompatible.
- connect_event_source(source: EventSource, target_queue_identifier: str)
Connects an external event source to one of this part’s input queues.
This establishes both the data path (via an EventInterface) and the synchronization signal for waking up the part’s run loop.
- Parameters:
source (EventSource) – The EventSource to connect.
target_queue_identifier (str) – The identifier of the target input queue on this part.
- Raises:
KeyError – If the target queue identifier is not found.
EventInterface.ValidationError – If the event source queue and target queue are incompatible.
ValueError – If the connection is invalid.
- execute()
Executes a single cycle of the part. For structural parts, it propagates inputs and runs the inner loop once. For behavioral parts, it calls the behavior() method.
- Raises:
Exception – If the behavioral part’s behavior method raises an exception.
Port.PayloadError – If a data transfer fails due to type mismatch.
EventQueue.PayloadError – If an event transfer fails due to type mismatch.
Port.OverwriteError – If a data transfer attempts to overwrite an unconsumed port payload.
- get_event_queue(identifier: str) EventQueue
Retrieves an event queue by its identifier.
- Parameters:
identifier (str) – The name of the event queue to retrieve.
- Returns:
The requested event queue instance.
- Return type:
- Raises:
KeyError – If the event queue is not found.
- get_event_queues() List[EventQueue]
Retrieves all event queues of this part.
- Returns:
A list of all event queue instances belonging to this part.
- Return type:
List[EventQueue]
- get_exception() Exception | None
Gets the exception that caused the run loop to terminate, if any.
- Returns:
The exception object if one occurred, otherwise None.
- Return type:
Optional[Exception]
- get_full_identifier() str
Gets the fully qualified identifier of the part.
- Returns:
The fully qualified identifier, including parent hierarchy.
- Return type:
str
- get_identifier() str
Gets the local identifier of the part.
- Returns:
The local identifier of the part.
- Return type:
str
- get_interfaces() List[Interface]
Retrieves all data port interfaces (connections) of this structural part.
- Returns:
A list of all interface instances.
- Return type:
List[Interface]
- get_part(identifier: str) Part
Retrieves an inner part by its identifier.
- Parameters:
identifier (str) – The name of the inner part to retrieve.
- Returns:
The requested inner part instance.
- Return type:
- Raises:
KeyError – If the part is not found.
- get_parts() List[Part]
Retrieves all inner parts of this structural part.
- Returns:
A list of all inner part instances.
- Return type:
List[Part]
- get_port(identifier: str) Port
Retrieves a port by its identifier.
- Parameters:
identifier (str) – The name of the port to retrieve.
- Returns:
The requested port instance.
- Return type:
- Raises:
KeyError – If the port is not found.
- get_ports(direction: str) List[Port]
Retrieves all ports of a given direction.
- Parameters:
direction (str) – The direction of ports to retrieve (Port.IN or Port.OUT).
- Returns:
A list of ports matching the given direction.
- Return type:
List[Port]
- Raises:
ValueError – If the direction is invalid.
- init()
Recursively initializes the part and all its sub-parts.
This method should be called once on the top-level part before starting it. It executes all registered ‘init’ hooks in a top-down manner.
- is_running() bool
Checks if the part’s execution thread is currently running.
This is only applicable for structural parts that have been started.
- Returns:
True if the thread exists and is running, False otherwise.
- Return type:
bool
- read(identifier: str)
Reads data from an input port based on its semantic.
If the port is
Port.NOT_PERSISTENT, the port must have been updated in the current step. If the port isPort.PERSISTENT, the last known payload is returned even if the port hasn’t been updated in the current step.- Parameters:
identifier – The local identifier of the port to read from.
- Returns:
The payload read from the port.
- Return type:
Any
- Raises:
KeyError – If the port is not found.
Port.StaleReadError – If the port is NOT_PERSISTENT and has not been updated.
- start(stop_condition: Callable[[Part], bool])
Starts the part’s run loop in a new thread.
This method is only applicable to structural parts.
- Parameters:
stop_condition – A callable that returns True to stop the loop.
- Raises:
NotExecutableError – If called on a behavioral part.
RuntimeError – If the part is already running.
- stop()
Signals the part’s run loop and all its children to stop. This is a non-blocking call. Use wait() to block until the thread finishes.
- term()
Recursively terminates the part and all its sub-parts.
This method should be called once on the top-level part after it has finished its execution. It executes all registered ‘term’ hooks in a top-down manner.
- trace_log(message: str, details: dict | None = None)
Logs a custom message to the global tracer from within a part’s behavior.
This is the recommended way for user-defined parts to create trace output.
- Parameters:
message – The string message to log.
details – An optional dictionary of key-value pairs for structured data.
- validate_insertion(item: Any) bool
Validates if an item (Port or EventQueue) can be added to this part.
This method is meant to be overridden for custom validation logic.
- Parameters:
item – The item to validate (Port or EventQueue).
- Returns:
True if the item is valid, False otherwise.
- Return type:
bool
- wait(timeout: float | None = None)
Waits for the part’s run thread to complete.
- Parameters:
timeout – The maximum time in seconds to wait for the thread to complete.
- wait_for_ready()
Waits for all managed worker processes to signal that they are ready.
This method is only applicable for structural parts using a process-based execution strategy (i.e., mode=’process’). It blocks until each worker process has completed its initialization and sent a ready signal.
If called on a part that uses a sequential or thread-based strategy, this method will do nothing and return immediately.
- wire(master_path: str, slave_path: str)
Connects two ports using their string paths relative to this part.
- Parameters:
master_path – Path to the source port (e.g., ‘source.clk’ or ‘in_port’).
slave_path – Path to the destination port (e.g., ‘dut.clk’ or ‘out_port’).
- Raises:
ValueError – If a path cannot be resolved to a port.
- wire_event(master_path: str, slave_path: str)
Connects two event queues using their string paths relative to this part.
- Parameters:
master_path – Path to the source queue (e.g., ‘timer_q’ or ‘source.out_q’).
slave_path – Path to the destination queue (e.g., ‘sync.timer_in’).
- Raises:
ValueError – If a path cannot be resolved to an event queue.
- write(identifier: str, payload: Any)
Writes data to an output port.
- Parameters:
identifier – The local identifier of the port to write to.
payload – The data to write.
- Raises:
KeyError – If the port is not found.
Port.PayloadError – If the payload type does not match the port’s declared type.
Port.OverwriteError – If the port has already been updated in the current step, or its payload hasn’t been consumed in the previous steps.
- class ml.engine.Port(identifier: str, direction: str, type: Type | None = None, init_value: Any | None = None, semantic: str = 'not persistent', payload_check: bool = False, conf: Any | None = None)
Bases:
objectRepresents an input or output port for a Part.
Methods meant to be overridden:
validate_connection: Validates if this port can be connected to a specific slave port.
- IN = 'input'
- NOT_PERSISTENT = 'not persistent'
- OUT = 'output'
- exception OverwriteError
Bases:
ExceptionException raised when set() is called on an already updated Port.
- PERSISTENT = 'persistent'
- Parent
alias of
Any
- exception PayloadError
Bases:
ExceptionException raised when set() is called with a payload of incorrect type.
- exception StaleReadError
Bases:
ExceptionException raised when get() is called on a port that has not been updated.
- get() Any
Retrieves the payload from the port. This is a non-destructive read; the ‘updated’ flag is cleared by the engine after the owning part’s execute() method has completed for the current step.
- Returns:
The payload currently held by the port.
- Return type:
Any
- Raises:
Port.StaleReadError – If the port has not been updated.
- get_direction() str
Gets the direction of the port.
- Returns:
The direction of the port (e.g., ‘input’ or ‘output’).
- Return type:
str
- get_full_identifier() str
Gets the fully qualified identifier of the port.
- Returns:
The fully qualified identifier, including parent hierarchy.
- Return type:
str
- get_identifier() str
Gets the local identifier of the port.
- Returns:
The local identifier of the port.
- Return type:
str
- get_parent() Parent | None
Gets the parent component of the port.
- Returns:
The parent object (typically a Part), or None if not set.
- Return type:
Optional[Parent]
- get_semantic() str
Gets the semantic type of the port.
- Returns:
The semantic type (e.g., ‘persistent’ or ‘not persistent’).
- Return type:
str
- get_type() Type | None
Gets the expected type of the payload.
- Returns:
The expected type, or None if no type has been specified.
- Return type:
Optional[Type]
- is_updated() bool
Checks if the port’s payload has been updated since the last clear.
- Returns:
True if the port holds new data, False otherwise.
- Return type:
bool
- peek() Any
Retrieves the payload from the port without checking the ‘updated’ flag. This is a non-destructive read intended for monitoring or debugging from outside the synchronous dataflow execution.
- Returns:
The payload currently held by the port.
- Return type:
Any
- set(payload: Any)
Sets the payload of the port and marks it as updated.
This method is called by the framework during data transfers and should be called by the user inside a behavior() method to set the value of an output port.
- Parameters:
payload – The new payload for the port.
- Raises:
Port.OverwriteError – If the port has already been updated in the current step, or its payload hasn’t been consumed in the previous steps.
Port.PayloadError – If the payload type does not match the port’s declared type.
- validate_connection(slave_port: Port) bool
Validates if this port can be connected to the given slave port.
This method is meant to be overridden for custom validation logic. The default implementation checks if the master port’s type is a subclass of the slave port’s type, if both are specified.
- Parameters:
slave_port – The slave port to connect to.
- Returns:
True if the connection is valid, False otherwise.
- Return type:
bool