Parts API

These are the built-in event parts.

class ml.parts.Control_Element(identifier: str, execution_strategy: callable, initial_kp: float, initial_ki: float, initial_kd: float, initial_integral: float = 0.0, initial_error: float = 0.0, integral_min: float | None = None, integral_max: float | None = None)[source]

Bases: Part

A structural part that composes a PID controller and an error calculation operator to form a complete, standard control loop element.

This part is a reusable block for feedback control systems.

Ports:
  • time (IN): The current simulation time, passed to the PID.

  • setpoint (IN): The desired value for the system.

  • measure (IN): The current measured value from the system.

  • kp_in (IN): For dynamically updating the proportional gain.

  • ki_in (IN): For dynamically updating the integral gain.

  • kd_in (IN): For dynamically updating the derivative gain.

  • actuation (OUT): The final control signal to be sent to the plant.

class ml.parts.EventToDataSynchronizer(identifier: str, input_queue_id: str, output_port_id: str)[source]

Bases: Part

A generic behavioral part that synchronizes an asynchronous event to the synchronous dataflow domain.

It consumes an event from a specified input EventQueue, extracts its payload, and sets that payload on a specified output Port. This is a common pattern for converting triggers (like a timer tick) into data that can be processed by dataflow-driven parts in the next simulation step.

behavior()[source]

Pops a single event from the input queue and sets its payload on the output port.

This method is called by the engine when the part is scheduled to run, which by default happens when its input event queue is not empty.

class ml.parts.Operator(identifier: str, inputs: int, op: callable)[source]

Bases: Part

A generic, stateless behavioral part that applies a user-defined function to its inputs.

It waits for all of its input ports to receive new data, then calls the provided operation op with the input values and sets the result on its single output port.

Ports:
  • in_0, in_1, …: A number of input ports defined by inputs.

  • out: A single output port.

behavior()[source]

Collects values from all input ports, applies the user-defined operation to them, and sets the result on the output port.

The part’s custom scheduling condition ensures this method only runs when all input ports have been updated, so no is_updated() checks are needed here.

class ml.parts.PID(identifier: str, initial_kp: float, initial_ki: float, initial_kd: float, initial_integral: float = 0.0, initial_error: float = 0.0, integral_min: float | None = None, integral_max: float | None = None)[source]

Bases: Part

A behavioral part implementing a standard PID (Proportional-Integral-Derivative) controller.

It calculates an actuation value based on an error signal and the passage of time. The time step dt is calculated dynamically from successive values on the time input port.

Ports:
  • time (IN): Receives the current simulation time.

  • error (IN): Receives the current error signal (setpoint - measurement).

  • kp_in (IN): Receives updates for the proportional gain.

  • ki_in (IN): Receives updates for the integral gain.

  • kd_in (IN): Receives updates for the derivative gain.

  • actuation (OUT): Outputs the calculated control signal.

behavior()[source]

Calculates the PID output. The scheduler ensures both time and error ports are updated before this method is called. It handles the initial state gracefully and calculates dt dynamically. Gain values are updated whenever their respective ports receive new data.