Source code for ml.parts

from .engine import Part, Port, EventQueue
from typing import Optional

[docs] class EventToDataSynchronizer(Part): """ A generic behavioral part that synchronizes an asynchronous event to the synchronous dataflow domain. It consumes an event from a specified input EventQueue, extracts its payload, and sets that payload on a specified output Port. This is a common pattern for converting triggers (like a timer tick) into data that can be processed by dataflow-driven parts in the next simulation step. """ def __init__(self, identifier: str, input_queue_id: str, output_port_id: str): """ Initializes the EventToDataSynchronizer. Args: identifier: The unique name for this part. input_queue_id: The identifier for the input EventQueue. output_port_id: The identifier for the output Port. """ # One input event queue event_queues = [EventQueue(input_queue_id, EventQueue.IN, size=1)] # A single output data port ports = [Port(output_port_id, Port.OUT)] super().__init__(identifier=identifier, ports=ports, event_queues=event_queues) # Store the IDs for use in the behavior method self._input_queue_id = input_queue_id self._output_port_id = output_port_id
[docs] def behavior(self): """ Pops a single event from the input queue and sets its payload on the output port. This method is called by the engine when the part is scheduled to run, which by default happens when its input event queue is not empty. """ event_queue = self.get_event_queue(self._input_queue_id) if not event_queue.is_empty(): payload = event_queue.pop() # Set the payload on the single output port self.get_port(self._output_port_id).set(payload)
[docs] class Operator(Part): """ A generic, stateless behavioral part that applies a user-defined function to its inputs. It waits for all of its input ports to receive new data, then calls the provided operation `op` with the input values and sets the result on its single output port. Ports: - `in_0`, `in_1`, ...: A number of input ports defined by `inputs`. - `out`: A single output port. """ def __init__(self, identifier: str, inputs: int, op: callable): """ Initializes the Operator. Args: identifier: The unique name for this part. inputs: The number of input ports to create. op: A callable that takes `inputs` arguments and returns a single value. """ ports = [Port('out', Port.OUT)] for i in range(inputs): ports.append(Port(f'in_{i}', Port.IN)) super().__init__( identifier=identifier, ports=ports, scheduling_condition=lambda part: all(p.is_updated() for p in part.get_ports(Port.IN)) ) self.inputs = inputs self.op = op
[docs] def behavior(self): """ Collects values from all input ports, applies the user-defined operation to them, and sets the result on the output port. The part's custom scheduling condition ensures this method only runs when all input ports have been updated, so no `is_updated()` checks are needed here. """ input_values = [] for i in range(self.inputs): input_values.append(self.get_port(f'in_{i}').get()) out_payload = self.op(*input_values) self.get_port('out').set(out_payload)
[docs] class PID(Part): """ A behavioral part implementing a standard PID (Proportional-Integral-Derivative) controller. It calculates an actuation value based on an error signal and the passage of time. The time step `dt` is calculated dynamically from successive values on the `time` input port. Ports: - `time` (IN): Receives the current simulation time. - `error` (IN): Receives the current error signal (setpoint - measurement). - `kp_in` (IN): Receives updates for the proportional gain. - `ki_in` (IN): Receives updates for the integral gain. - `kd_in` (IN): Receives updates for the derivative gain. - `actuation` (OUT): Outputs the calculated control signal. """ def __init__(self, identifier: str, initial_kp: float, initial_ki: float, initial_kd: float, initial_integral: float = 0.0, initial_error: float = 0.0, integral_min: Optional[float] = None, integral_max: Optional[float] = None): """ Initializes the PID controller. Args: identifier: The unique name for this part. initial_kp: The initial proportional gain. initial_ki: The initial integral gain. initial_kd: The initial derivative gain. initial_integral: The starting value for the integral term. initial_error: The error value from the previous step, used for the first derivative calculation. integral_min: The minimum value to which the integral term will be clamped. integral_max: The maximum value to which the integral term will be clamped. """ ports = [ Port('time', Port.IN), Port('error', Port.IN), Port('kp_in', Port.IN), Port('ki_in', Port.IN), Port('kd_in', Port.IN), Port('actuation', Port.OUT) ] super().__init__( identifier=identifier, ports=ports, scheduling_condition=lambda part: part.get_port('time').is_updated() and part.get_port('error').is_updated() ) self.integral = initial_integral self.previous_error = initial_error self.kp = initial_kp self.ki = initial_ki self.kd = initial_kd self.previous_time: float | None = None self.integral_min = integral_min self.integral_max = integral_max
[docs] def behavior(self): """ Calculates the PID output. The scheduler ensures both `time` and `error` ports are updated before this method is called. It handles the initial state gracefully and calculates `dt` dynamically. Gain values are updated whenever their respective ports receive new data. """ # Update gains if new values have been provided (they are optional) kp_port = self.get_port('kp_in') if kp_port.is_updated(): self.kp = kp_port.get() ki_port = self.get_port('ki_in') if ki_port.is_updated(): self.ki = ki_port.get() kd_port = self.get_port('kd_in') if kd_port.is_updated(): self.kd = kd_port.get() # The scheduler guarantees that 'time' and 'error' are updated. current_time = self.get_port('time').get() error = self.get_port('error').get() if self.previous_time is None: # First run: only P-term is active, I and D are zero. # Set a default output to satisfy the dataflow contract. actuation = self.kp * error self.get_port('actuation').set(actuation) else: # Subsequent runs: full PID calculation. t_step = current_time - self.previous_time if t_step > 0: self.integral += error * type(error)(t_step) # Clamp the integral term to prevent windup if self.integral_max is not None: self.integral = min(self.integral, self.integral_max) if self.integral_min is not None: self.integral = max(self.integral, self.integral_min) derivative = (error - self.previous_error) / type(error)(t_step) actuation = self.kp * error + self.ki * self.integral + self.kd * derivative self.get_port('actuation').set(actuation) # If t_step is not > 0, we don't set a new actuation value. # Update state for the next step self.previous_error = error self.previous_time = current_time
[docs] class Control_Element(Part): """ A structural part that composes a PID controller and an error calculation operator to form a complete, standard control loop element. This part is a reusable block for feedback control systems. Ports: - `time` (IN): The current simulation time, passed to the PID. - `setpoint` (IN): The desired value for the system. - `measure` (IN): The current measured value from the system. - `kp_in` (IN): For dynamically updating the proportional gain. - `ki_in` (IN): For dynamically updating the integral gain. - `kd_in` (IN): For dynamically updating the derivative gain. - `actuation` (OUT): The final control signal to be sent to the plant. """ def __init__(self, identifier: str, execution_strategy: callable, initial_kp: float, initial_ki: float, initial_kd: float, initial_integral: float = 0.0, initial_error: float = 0.0, integral_min: Optional[float] = None, integral_max: Optional[float] = None): """ Initializes the Control_Element. Args: identifier: The unique name for this part. execution_strategy: The strategy for executing inner parts (e.g., sequential_execution). initial_kp: The initial proportional gain for the inner PID. initial_ki: The initial integral gain for the inner PID. initial_kd: The initial derivative gain for the inner PID. initial_integral: The starting value for the PID's integral term. initial_error: The starting error for the PID's derivative term. integral_min: The minimum value for the PID's integral clamp. integral_max: The maximum value for the PID's integral clamp. """ ports = [ Port('time', Port.IN), Port('setpoint', Port.IN), Port('measure', Port.IN), Port('kp_in', Port.IN), Port('ki_in', Port.IN), Port('kd_in', Port.IN), Port('actuation', Port.OUT) ] parts = { 'pid' : PID('pid', initial_kp, initial_ki, initial_kd, initial_integral, initial_error, integral_min=integral_min, integral_max=integral_max), 'error' : Operator('error', 2, lambda in_0, in_1: in_0 - in_1) } super().__init__( identifier=identifier, ports=ports, parts=parts, execution_strategy=execution_strategy ) # Get references to inner parts for clarity pid_part = self.get_part('pid') error_part = self.get_part('error') # --- Wire the components together --- # Downward connections from parent's IN ports to children's IN ports self.connect(self.get_port('time'), pid_part.get_port('time')) self.connect(self.get_port('setpoint'), error_part.get_port('in_0')) self.connect(self.get_port('measure'), error_part.get_port('in_1')) self.connect(self.get_port('kp_in'), pid_part.get_port('kp_in')) self.connect(self.get_port('ki_in'), pid_part.get_port('ki_in')) self.connect(self.get_port('kd_in'), pid_part.get_port('kd_in')) # Peer-to-peer connection between children self.connect(error_part.get_port('out'), pid_part.get_port('error')) # Upward connection from child's OUT port to parent's OUT port self.connect(pid_part.get_port('actuation'), self.get_port('actuation'))