from .engine import Part, Port, EventQueue
from typing import Optional
[docs]
class EventToDataSynchronizer(Part):
"""
A generic behavioral part that synchronizes an asynchronous event to the
synchronous dataflow domain.
It consumes an event from a specified input EventQueue, extracts its
payload, and sets that payload on a specified output Port. This is a
common pattern for converting triggers (like a timer tick) into data
that can be processed by dataflow-driven parts in the next simulation step.
"""
def __init__(self, identifier: str, input_queue_id: str, output_port_id: str):
"""
Initializes the EventToDataSynchronizer.
Args:
identifier: The unique name for this part.
input_queue_id: The identifier for the input EventQueue.
output_port_id: The identifier for the output Port.
"""
# One input event queue
event_queues = [EventQueue(input_queue_id, EventQueue.IN, size=1)]
# A single output data port
ports = [Port(output_port_id, Port.OUT)]
super().__init__(identifier=identifier, ports=ports, event_queues=event_queues)
# Store the IDs for use in the behavior method
self._input_queue_id = input_queue_id
self._output_port_id = output_port_id
[docs]
def behavior(self):
"""
Pops a single event from the input queue and sets its payload on the
output port.
This method is called by the engine when the part is scheduled to run,
which by default happens when its input event queue is not empty.
"""
event_queue = self.get_event_queue(self._input_queue_id)
if not event_queue.is_empty():
payload = event_queue.pop()
# Set the payload on the single output port
self.get_port(self._output_port_id).set(payload)
[docs]
class Operator(Part):
"""
A generic, stateless behavioral part that applies a user-defined function
to its inputs.
It waits for all of its input ports to receive new data, then calls the
provided operation `op` with the input values and sets the result on its
single output port.
Ports:
- `in_0`, `in_1`, ...: A number of input ports defined by `inputs`.
- `out`: A single output port.
"""
def __init__(self, identifier: str, inputs: int, op: callable):
"""
Initializes the Operator.
Args:
identifier: The unique name for this part.
inputs: The number of input ports to create.
op: A callable that takes `inputs` arguments and returns a single value.
"""
ports = [Port('out', Port.OUT)]
for i in range(inputs):
ports.append(Port(f'in_{i}', Port.IN))
super().__init__(
identifier=identifier,
ports=ports,
scheduling_condition=lambda part: all(p.is_updated() for p in part.get_ports(Port.IN))
)
self.inputs = inputs
self.op = op
[docs]
def behavior(self):
"""
Collects values from all input ports, applies the user-defined
operation to them, and sets the result on the output port.
The part's custom scheduling condition ensures this method only runs
when all input ports have been updated, so no `is_updated()` checks
are needed here.
"""
input_values = []
for i in range(self.inputs):
input_values.append(self.get_port(f'in_{i}').get())
out_payload = self.op(*input_values)
self.get_port('out').set(out_payload)
[docs]
class PID(Part):
"""
A behavioral part implementing a standard PID (Proportional-Integral-Derivative)
controller.
It calculates an actuation value based on an error signal and the passage
of time. The time step `dt` is calculated dynamically from successive
values on the `time` input port.
Ports:
- `time` (IN): Receives the current simulation time.
- `error` (IN): Receives the current error signal (setpoint - measurement).
- `kp_in` (IN): Receives updates for the proportional gain.
- `ki_in` (IN): Receives updates for the integral gain.
- `kd_in` (IN): Receives updates for the derivative gain.
- `actuation` (OUT): Outputs the calculated control signal.
"""
def __init__(self, identifier: str, initial_kp: float, initial_ki: float, initial_kd: float, initial_integral: float = 0.0, initial_error: float = 0.0, integral_min: Optional[float] = None, integral_max: Optional[float] = None):
"""
Initializes the PID controller.
Args:
identifier: The unique name for this part.
initial_kp: The initial proportional gain.
initial_ki: The initial integral gain.
initial_kd: The initial derivative gain.
initial_integral: The starting value for the integral term.
initial_error: The error value from the previous step, used for the
first derivative calculation.
integral_min: The minimum value to which the integral term will be clamped.
integral_max: The maximum value to which the integral term will be clamped.
"""
ports = [
Port('time', Port.IN),
Port('error', Port.IN),
Port('kp_in', Port.IN),
Port('ki_in', Port.IN),
Port('kd_in', Port.IN),
Port('actuation', Port.OUT)
]
super().__init__(
identifier=identifier,
ports=ports,
scheduling_condition=lambda part: part.get_port('time').is_updated() and part.get_port('error').is_updated()
)
self.integral = initial_integral
self.previous_error = initial_error
self.kp = initial_kp
self.ki = initial_ki
self.kd = initial_kd
self.previous_time: float | None = None
self.integral_min = integral_min
self.integral_max = integral_max
[docs]
def behavior(self):
"""
Calculates the PID output. The scheduler ensures both `time` and `error`
ports are updated before this method is called. It handles the initial
state gracefully and calculates `dt` dynamically. Gain values are updated
whenever their respective ports receive new data.
"""
# Update gains if new values have been provided (they are optional)
kp_port = self.get_port('kp_in')
if kp_port.is_updated():
self.kp = kp_port.get()
ki_port = self.get_port('ki_in')
if ki_port.is_updated():
self.ki = ki_port.get()
kd_port = self.get_port('kd_in')
if kd_port.is_updated():
self.kd = kd_port.get()
# The scheduler guarantees that 'time' and 'error' are updated.
current_time = self.get_port('time').get()
error = self.get_port('error').get()
if self.previous_time is None:
# First run: only P-term is active, I and D are zero.
# Set a default output to satisfy the dataflow contract.
actuation = self.kp * error
self.get_port('actuation').set(actuation)
else:
# Subsequent runs: full PID calculation.
t_step = current_time - self.previous_time
if t_step > 0:
self.integral += error * type(error)(t_step)
# Clamp the integral term to prevent windup
if self.integral_max is not None:
self.integral = min(self.integral, self.integral_max)
if self.integral_min is not None:
self.integral = max(self.integral, self.integral_min)
derivative = (error - self.previous_error) / type(error)(t_step)
actuation = self.kp * error + self.ki * self.integral + self.kd * derivative
self.get_port('actuation').set(actuation)
# If t_step is not > 0, we don't set a new actuation value.
# Update state for the next step
self.previous_error = error
self.previous_time = current_time
[docs]
class Control_Element(Part):
"""
A structural part that composes a PID controller and an error calculation
operator to form a complete, standard control loop element.
This part is a reusable block for feedback control systems.
Ports:
- `time` (IN): The current simulation time, passed to the PID.
- `setpoint` (IN): The desired value for the system.
- `measure` (IN): The current measured value from the system.
- `kp_in` (IN): For dynamically updating the proportional gain.
- `ki_in` (IN): For dynamically updating the integral gain.
- `kd_in` (IN): For dynamically updating the derivative gain.
- `actuation` (OUT): The final control signal to be sent to the plant.
"""
def __init__(self, identifier: str, execution_strategy: callable, initial_kp: float, initial_ki: float, initial_kd: float, initial_integral: float = 0.0, initial_error: float = 0.0, integral_min: Optional[float] = None, integral_max: Optional[float] = None):
"""
Initializes the Control_Element.
Args:
identifier: The unique name for this part.
execution_strategy: The strategy for executing inner parts (e.g., sequential_execution).
initial_kp: The initial proportional gain for the inner PID.
initial_ki: The initial integral gain for the inner PID.
initial_kd: The initial derivative gain for the inner PID.
initial_integral: The starting value for the PID's integral term.
initial_error: The starting error for the PID's derivative term.
integral_min: The minimum value for the PID's integral clamp.
integral_max: The maximum value for the PID's integral clamp.
"""
ports = [
Port('time', Port.IN),
Port('setpoint', Port.IN),
Port('measure', Port.IN),
Port('kp_in', Port.IN),
Port('ki_in', Port.IN),
Port('kd_in', Port.IN),
Port('actuation', Port.OUT)
]
parts = {
'pid' : PID('pid', initial_kp, initial_ki, initial_kd, initial_integral, initial_error, integral_min=integral_min, integral_max=integral_max),
'error' : Operator('error', 2, lambda in_0, in_1: in_0 - in_1)
}
super().__init__(
identifier=identifier,
ports=ports,
parts=parts,
execution_strategy=execution_strategy
)
# Get references to inner parts for clarity
pid_part = self.get_part('pid')
error_part = self.get_part('error')
# --- Wire the components together ---
# Downward connections from parent's IN ports to children's IN ports
self.connect(self.get_port('time'), pid_part.get_port('time'))
self.connect(self.get_port('setpoint'), error_part.get_port('in_0'))
self.connect(self.get_port('measure'), error_part.get_port('in_1'))
self.connect(self.get_port('kp_in'), pid_part.get_port('kp_in'))
self.connect(self.get_port('ki_in'), pid_part.get_port('ki_in'))
self.connect(self.get_port('kd_in'), pid_part.get_port('kd_in'))
# Peer-to-peer connection between children
self.connect(error_part.get_port('out'), pid_part.get_port('error'))
# Upward connection from child's OUT port to parent's OUT port
self.connect(pid_part.get_port('actuation'), self.get_port('actuation'))