import time
import threading
from .engine import EventSource
from typing import Optional
from .enums import OnFullBehavior
from .tracer import Tracer
from . import conf
[docs]
class Timer(EventSource):
"""An EventSource that emits a time event at a fixed interval.
This timer is designed to be robust. The behavior when the simulation
cannot keep up with the timer's rate is controlled by the `on_full`
parameter.
- `OnFullBehavior.FAIL` (default): A full queue will raise an exception,
which is caught by the base `EventSource` run loop. This stops the timer
and logs a clear error, which is the recommended mode for hard real-time
simulations where every tick is critical.
- `OnFullBehavior.DROP`: A full queue will cause the event to be dropped.
The timer will continue running. This is useful for soft real-time
simulations where occasional missed ticks are acceptable.
- `OnFullBehavior.OVERWRITE`: A full queue will cause an existing event in
the queue to be replaced with the new one. This is useful when only the
latest value is important.
"""
def __init__(self, identifier: str, interval_seconds: float, duration_seconds: Optional[float] = None, on_full: OnFullBehavior = OnFullBehavior.FAIL):
super().__init__(identifier, on_full=on_full)
if interval_seconds <= 0:
raise ValueError(conf.ERROR_TIMER_INVALID_INTERVAL)
self.__interval = interval_seconds
self.__duration_seconds = duration_seconds
self.__current_time = 0.0
self.__lock = threading.Lock()
def _run_loop(self):
"""A high-precision, drift-free timer loop."""
Tracer.log(conf.LOG_LEVEL_TIMER_LIFECYCLE, self.get_identifier(), conf.LOG_EVENT_RUN_LOOP_START, {"interval": self.__interval, "duration": self.__duration_seconds})
start_wall_time = time.time()
sim_time = 0.0
# Emit the initial t=0 event
with self.__lock:
self.__current_time = sim_time
self.emit(sim_time)
# Immediately check for auto-stop if duration is 0 or very small
if self.__duration_seconds is not None and sim_time >= self.__duration_seconds:
Tracer.log(conf.LOG_LEVEL_TIMER_DURATION_REACHED, self.get_identifier(), conf.LOG_EVENT_TIMER_DURATION_REACHED, {"sim_time": sim_time})
self.stop()
return
while self.is_running():
sim_time += self.__interval
next_tick_wall_time = start_wall_time + sim_time
sleep_duration = next_tick_wall_time - time.time()
if sleep_duration > 0:
time.sleep(sleep_duration)
if not self.is_running():
break
with self.__lock:
self.__current_time = sim_time
self.emit(sim_time)
# Check for auto-stop condition
if self.__duration_seconds is not None and sim_time >= self.__duration_seconds:
Tracer.log(conf.LOG_LEVEL_TIMER_DURATION_REACHED, self.get_identifier(), conf.LOG_EVENT_TIMER_DURATION_REACHED, {"sim_time": sim_time})
self.stop()
break
Tracer.log(conf.LOG_LEVEL_TIMER_LIFECYCLE, self.get_identifier(), conf.LOG_EVENT_RUN_LOOP_END)
[docs]
def get_current_time(self) -> float:
"""
Gets the most recent time value generated by the timer.
This method is thread-safe.
Returns:
float: The most recent time value emitted by the timer.
"""
with self.__lock:
return self.__current_time