Source code for ml.event_sources

import time
import threading
from .engine import EventSource
from typing import Optional
from .enums import OnFullBehavior
from .tracer import Tracer
from . import conf

[docs] class Timer(EventSource): """An EventSource that emits a time event at a fixed interval. This timer is designed to be robust. The behavior when the simulation cannot keep up with the timer's rate is controlled by the `on_full` parameter. - `OnFullBehavior.FAIL` (default): A full queue will raise an exception, which is caught by the base `EventSource` run loop. This stops the timer and logs a clear error, which is the recommended mode for hard real-time simulations where every tick is critical. - `OnFullBehavior.DROP`: A full queue will cause the event to be dropped. The timer will continue running. This is useful for soft real-time simulations where occasional missed ticks are acceptable. - `OnFullBehavior.OVERWRITE`: A full queue will cause an existing event in the queue to be replaced with the new one. This is useful when only the latest value is important. """ def __init__(self, identifier: str, interval_seconds: float, duration_seconds: Optional[float] = None, on_full: OnFullBehavior = OnFullBehavior.FAIL): super().__init__(identifier, on_full=on_full) if interval_seconds <= 0: raise ValueError(conf.ERROR_TIMER_INVALID_INTERVAL) self.__interval = interval_seconds self.__duration_seconds = duration_seconds self.__current_time = 0.0 self.__lock = threading.Lock() def _run_loop(self): """A high-precision, drift-free timer loop.""" Tracer.log(conf.LOG_LEVEL_TIMER_LIFECYCLE, self.get_identifier(), conf.LOG_EVENT_RUN_LOOP_START, {"interval": self.__interval, "duration": self.__duration_seconds}) start_wall_time = time.time() sim_time = 0.0 # Emit the initial t=0 event with self.__lock: self.__current_time = sim_time self.emit(sim_time) # Immediately check for auto-stop if duration is 0 or very small if self.__duration_seconds is not None and sim_time >= self.__duration_seconds: Tracer.log(conf.LOG_LEVEL_TIMER_DURATION_REACHED, self.get_identifier(), conf.LOG_EVENT_TIMER_DURATION_REACHED, {"sim_time": sim_time}) self.stop() return while self.is_running(): sim_time += self.__interval next_tick_wall_time = start_wall_time + sim_time sleep_duration = next_tick_wall_time - time.time() if sleep_duration > 0: time.sleep(sleep_duration) if not self.is_running(): break with self.__lock: self.__current_time = sim_time self.emit(sim_time) # Check for auto-stop condition if self.__duration_seconds is not None and sim_time >= self.__duration_seconds: Tracer.log(conf.LOG_LEVEL_TIMER_DURATION_REACHED, self.get_identifier(), conf.LOG_EVENT_TIMER_DURATION_REACHED, {"sim_time": sim_time}) self.stop() break Tracer.log(conf.LOG_LEVEL_TIMER_LIFECYCLE, self.get_identifier(), conf.LOG_EVENT_RUN_LOOP_END)
[docs] def get_current_time(self) -> float: """ Gets the most recent time value generated by the timer. This method is thread-safe. Returns: float: The most recent time value emitted by the timer. """ with self.__lock: return self.__current_time