Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions isotp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from isotp.can_message import CanMessage
from isotp.tools import Timer, FiniteByteGenerator
from isotp.sleep_ns import precise_sleep
import isotp.address
import isotp.errors

Expand Down Expand Up @@ -366,7 +367,7 @@ def __init__(self) -> None:
self.listen_mode = False
self.blocking_send = False
self.logger_name = TransportLayer.LOGGER_NAME
self.wait_func = time.sleep
self.wait_func = precise_sleep

def set(self, key: str, val: Any, validate: bool = True) -> None:
param_alias: Dict[str, str] = {
Expand Down Expand Up @@ -1611,7 +1612,7 @@ def _relay_thread_fn(self) -> None:
else: # No data received. Sleep if user is not blocking
if not self.events.stop_requested.is_set():
if not self.blocking_rxfn or diff < rx_timeout * 0.5:
time.sleep(max(0, min(self.sleep_time(), rx_timeout - diff)))
self.params.wait_func(max(0, min(self.sleep_time(), rx_timeout - diff)))

def _main_thread_fn(self) -> None:
"""Internal function executed by the main thread. """
Expand Down
67 changes: 67 additions & 0 deletions isotp/sleep_ns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import sys
import time
import ctypes
import ctypes.util


def _precise_sleep_windows(duration_sec: float) -> None:
"""High-precision sleep implementation for Windows"""
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]

# Create a waitable timer
timer = kernel32.CreateWaitableTimerExW(
None, None, 0x00000002, 0x1F0003
)

# Time unit is 100 nanoseconds, negative value means relative time
delay = ctypes.c_longlong(int(-duration_sec * 10000000))

# Set the timer
kernel32.SetWaitableTimer(
timer, ctypes.byref(delay), 0, None, None, False
)

# Wait for the timer to trigger
kernel32.WaitForSingleObject(timer, 0xFFFFFFFF)

# Close the handle
kernel32.CloseHandle(timer)

def _precise_sleep_linux(duration_sec: float)-> None:
"""High-precision sleep implementation for Linux"""
try:
# Try to use clock_nanosleep (most precise)
librt = ctypes.CDLL(ctypes.util.find_library("rt"), use_errno=True)

class timespec(ctypes.Structure):
_fields_ = [("tv_sec", ctypes.c_long),
("tv_nsec", ctypes.c_long)]

req = timespec()
req.tv_sec = int(duration_sec)
req.tv_nsec = int((duration_sec - req.tv_sec) * 1e9)

CLOCK_MONOTONIC = 1
result = librt.clock_nanosleep(
CLOCK_MONOTONIC, 0, ctypes.byref(req), None
)

if result != 0:
errno = ctypes.get_errno()
raise OSError(errno, f"clock_nanosleep failed with errno {errno}")

except Exception as e:
# If clock_nanosleep fails, fallback to select
if duration_sec > 0:
time.sleep(duration_sec)


def precise_sleep(duration_sec: float)-> None:
"""
Cross-platform high-precision sleep function
:param duration_sec: Sleep time in seconds (can be float)
"""
if sys.platform == 'win32':
_precise_sleep_windows(duration_sec)
else:
_precise_sleep_linux(duration_sec)