from typing import Set, Optional
import threading
from ml.engine import Part
from .tracer import Tracer
from ml import data
from . import conf as ml_conf
[docs]
def execute(part: Part, creator_thread_name: str, parent_log_entry: Optional[dict], tick: Optional[int] = None):
"""
A wrapper to log the start and end of a part's execution within a
dedicated thread spawned by a parallel execution strategy.
This also sets the thread-local decimal precision.
"""
# Set thread-local settings.
data.set_context()
part.tick = tick
details = {
ml_conf.LOG_DETAIL_KEY_THREAD_NAME: threading.current_thread().name,
ml_conf.LOG_DETAIL_KEY_CREATOR_THREAD: creator_thread_name,
}
if parent_log_entry:
details[ml_conf.TRACER_KEY_PARENT_EVENT_ID] = parent_log_entry[ml_conf.TRACER_KEY_DETAILS][ml_conf.TRACER_KEY_EVENT_ID]
Tracer.log(ml_conf.LOG_LEVEL_EXECUTION, part.get_full_identifier(), ml_conf.LOG_EVENT_EXECUTION_THREAD_START, details, tick=tick)
part.execute()
Tracer.log(ml_conf.LOG_LEVEL_EXECUTION, part.get_full_identifier(), ml_conf.LOG_EVENT_EXECUTION_THREAD_END, {ml_conf.LOG_DETAIL_KEY_THREAD_NAME: threading.current_thread().name}, tick=tick)
[docs]
def sequential_execution(parent_part: Part, scheduled_parts: Set[Part], strategy_event: dict):
"""
A simple execution strategy that executes scheduled parts sequentially.
It iterates through the provided set of parts and calls the `execute()`
method on each one. The order of execution is not guaranteed due to
the nature of sets.
Args:
parent_part: The structural part that is invoking this strategy.
scheduled_parts: A set of parts that have been scheduled for execution.
strategy_event: The log entry for the strategy's start event (unused).
"""
# The tracer logs which parts are scheduled in the `__step` method.
for part in scheduled_parts:
# The `execute` method itself is instrumented with trace calls.
part.execute()