Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/smpclient/transport/ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ def __init__(self, winrt: WinRTClientArgs = {}) -> None:
self._max_write_without_response_size = 20
"""Initially set to BLE minimum; may be mutated by the `connect()` method."""

self._using_windows_bonded_fallback = False
"""Tracks if we're using Windows bonded device fallback (MAC address connection)."""

logger.debug(f"Initialized {self.__class__.__name__}")

@override
async def connect(self, address: str, timeout_s: float) -> None:
self._using_windows_bonded_fallback = False
logger.debug(f"Scanning for {address=}")
device: BLEDevice | None = (
await BleakScanner.find_device_by_address(address, timeout=timeout_s)
Expand All @@ -98,6 +102,16 @@ async def connect(self, address: str, timeout_s: float) -> None:
winrt=self._winrt,
disconnected_callback=self._set_disconnected_event,
)
elif sys.platform == "win32" and MAC_ADDRESS_PATTERN.match(address):
logger.warning(f"WinBLE : Device '{address}' not found via BLE scan; trying MAC address connection")
self._client = BleakClient(
address,
services=(str(SMP_SERVICE_UUID),),
winrt=self._winrt,
disconnected_callback=self._set_disconnected_event,
)
# IMPORTANT: Do not set self._client._backend._device_info here, it will cancel any service discovery
self._using_windows_bonded_fallback = True
else:
raise SMPBLETransportDeviceNotFound(f"Device '{address}' not found")

Expand All @@ -106,6 +120,19 @@ async def connect(self, address: str, timeout_s: float) -> None:
self._disconnected_event.clear()
logger.debug(f"Connected to {device=}")

# Windows bonded devices need service discovery; wait for it to complete
if self._using_windows_bonded_fallback:
logger.debug("WinBLE : Waiting for services to populate")
max_retries = 50
for attempt in range(max_retries):
if self._client.services.services:
logger.debug(f"WinBLE : Services populated on attempt {attempt + 1}: {list(self._client.services.services.keys())}")
break
await asyncio.sleep(0.1)
else:
logger.error(f"WinBLE : Service discovery failed. Services still empty after {max_retries} attempts")
raise SMPBLETransportNotSMPServer("WinBLE : Service discovery failed - no services found.")

smp_characteristic = self._client.services.get_characteristic(SMP_CHARACTERISTIC_UUID)
if smp_characteristic is None:
raise SMPBLETransportNotSMPServer("Missing the SMP characteristic UUID.")
Expand Down
19 changes: 17 additions & 2 deletions tests/test_smp_ble_transport.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for `SMPBLETransport`."""

import asyncio
import sys
from typing import cast
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import UUID
Expand Down Expand Up @@ -99,10 +100,24 @@ async def test_connect(
)
mock_find_device_by_address.reset_mock()

# assert that it raises an exception if the device is not found
# assert that it raises an exception if the device is not found (non-MAC or non-Windows)
mock_find_device_by_address.return_value = None
mock_find_device_by_name.return_value = None
with pytest.raises(SMPBLETransportDeviceNotFound):
await SMPBLETransport().connect("00:00:00:00:00:00", 1.0)
await SMPBLETransport().connect("device name", 1.0)
mock_find_device_by_name.reset_mock()

# restore mock return values for next tests
mock_find_device_by_address.return_value = BLEDevice("address", "name", None)
mock_find_device_by_name.return_value = BLEDevice("address", "name", None)

# assert that on Windows with MAC, the bonded-device fallback is attempted
with patch("sys.platform", "win32"):
mock_find_device_by_address.return_value = None
t = SMPBLETransport()
await t.connect("00:00:00:00:00:00", 1.0)
# Verify that BleakClient was called with the MAC address (fallback attempt)
assert t._client is not None
mock_find_device_by_address.reset_mock()

# assert that connect is awaited
Expand Down
Loading