Source code for ml.strategies

from typing import Set, Optional
import threading
from ml.engine import Part
from .tracer import Tracer
from ml import data
from . import conf as ml_conf

[docs] def execute(part: Part, creator_thread_name: str, parent_log_entry: Optional[dict], tick: Optional[int] = None): """ A wrapper to log the start and end of a part's execution within a dedicated thread spawned by a parallel execution strategy. This also sets the thread-local decimal precision. """ # Set thread-local settings. data.set_context() part.tick = tick details = { ml_conf.LOG_DETAIL_KEY_THREAD_NAME: threading.current_thread().name, ml_conf.LOG_DETAIL_KEY_CREATOR_THREAD: creator_thread_name, } if parent_log_entry: details[ml_conf.TRACER_KEY_PARENT_EVENT_ID] = parent_log_entry[ml_conf.TRACER_KEY_DETAILS][ml_conf.TRACER_KEY_EVENT_ID] Tracer.log(ml_conf.LOG_LEVEL_EXECUTION, part.get_full_identifier(), ml_conf.LOG_EVENT_EXECUTION_THREAD_START, details, tick=tick) part.execute() Tracer.log(ml_conf.LOG_LEVEL_EXECUTION, part.get_full_identifier(), ml_conf.LOG_EVENT_EXECUTION_THREAD_END, {ml_conf.LOG_DETAIL_KEY_THREAD_NAME: threading.current_thread().name}, tick=tick)
[docs] def sequential_execution(parent_part: Part, scheduled_parts: Set[Part], strategy_event: dict): """ A simple execution strategy that executes scheduled parts sequentially. It iterates through the provided set of parts and calls the `execute()` method on each one. The order of execution is not guaranteed due to the nature of sets. Args: parent_part: The structural part that is invoking this strategy. scheduled_parts: A set of parts that have been scheduled for execution. strategy_event: The log entry for the strategy's start event (unused). """ # The tracer logs which parts are scheduled in the `__step` method. for part in scheduled_parts: # The `execute` method itself is instrumented with trace calls. part.execute()