Strategies API

These are the built-in execution strategies.

class ml.strategies.Execution(parallelization_condition: Callable[[Part], bool], mode: ExecutionMode, log_queue: Queue | None = None, error_queue: Queue | None = None, name: str | None = None)

Bases: object

A stateful execution strategy that orchestrates the execution of scheduled parts.

Based on a parallelization_condition, it separates parts into sequential and parallel groups. It executes the sequential group first, then concurrently executes the parallel group using either persistent processes (for performance) or temporary threads.

classmethod sequential(name: str | None = None) Execution

A factory method to create a purely sequential execution strategy.

Parameters:

name – An optional descriptive name for the strategy instance.

Returns:

An Execution object configured to run all parts sequentially.

ml.strategies.all_input_ports_updated(part: Part) bool

Schedules the part if all its input ports are updated.

Parameters:

part – The part to check.

Returns:

True if all input ports are updated, False otherwise.

ml.strategies.all_updated(part: Part, identifiers: str | List[str]) bool

Schedules the part if the port(s) with the specified identifier(s) is/are updated.

Parameters:
  • part – The part to check.

  • identifiers – The identifier(s) of the port(s) to check. Can be a single string or a list of strings. If a list is provided, returns True only if ALL specified ports are updated.

Returns:

True if the port(s) is/are updated, False otherwise.

ml.strategies.execute(part: Part, part_full_id_for_logging: str, creator_thread_name: str, parent_log_entry: dict | None, tick: int | None = None, log_queue: Queue | None = None, error_queue: Queue | None = None)

A wrapper to execute a part’s logic and handle logging and errors. This function is the target for temporary threads spawned by an Execution strategy.

It handles setting up the execution context (e.g., for data types), configuring worker-specific settings in process mode, logging execution boundaries for tracing, and robustly handling and reporting exceptions.

Parameters:
  • part – The Part instance to be executed. This can be a regular Part (in ‘thread’ mode) or a Blueprint proxy (in ‘process’ mode).

  • part_full_id_for_logging – The fully qualified identifier string for the part, used for logging purposes.

  • creator_thread_name – The name of the parent thread that initiated this execution.

  • parent_log_entry – The log entry dictionary from the parent’s execution strategy, used to establish a parent-child relationship in trace analysis.

  • tick – The current simulation tick number.

  • log_queue – An optional multiprocessing.Queue for sending logs back to the main process’s tracer.

  • error_queue – An optional multiprocessing.Queue for sending exceptions back to the main process.

ml.strategies.time_updated(part: Part) bool

Schedules the part if the time port is updated.

Parameters:

part – The part to check.

Returns:

True if the time port is updated, False otherwise.