diff --git a/src/smpclient/transport/ble.py b/src/smpclient/transport/ble.py index 0cc2c3f..83acda5 100644 --- a/src/smpclient/transport/ble.py +++ b/src/smpclient/transport/ble.py @@ -80,10 +80,14 @@ def __init__(self, winrt: WinRTClientArgs = {}) -> None: self._max_write_without_response_size = 20 """Initially set to BLE minimum; may be mutated by the `connect()` method.""" + self._using_windows_bonded_fallback = False + """Tracks if we're using Windows bonded device fallback (MAC address connection).""" + logger.debug(f"Initialized {self.__class__.__name__}") @override async def connect(self, address: str, timeout_s: float) -> None: + self._using_windows_bonded_fallback = False logger.debug(f"Scanning for {address=}") device: BLEDevice | None = ( await BleakScanner.find_device_by_address(address, timeout=timeout_s) @@ -98,6 +102,16 @@ async def connect(self, address: str, timeout_s: float) -> None: winrt=self._winrt, disconnected_callback=self._set_disconnected_event, ) + elif sys.platform == "win32" and MAC_ADDRESS_PATTERN.match(address): + logger.warning(f"WinBLE : Device '{address}' not found via BLE scan; trying MAC address connection") + self._client = BleakClient( + address, + services=(str(SMP_SERVICE_UUID),), + winrt=self._winrt, + disconnected_callback=self._set_disconnected_event, + ) + # IMPORTANT: Do not set self._client._backend._device_info here, it will cancel any service discovery + self._using_windows_bonded_fallback = True else: raise SMPBLETransportDeviceNotFound(f"Device '{address}' not found") @@ -106,6 +120,19 @@ async def connect(self, address: str, timeout_s: float) -> None: self._disconnected_event.clear() logger.debug(f"Connected to {device=}") + # Windows bonded devices need service discovery; wait for it to complete + if self._using_windows_bonded_fallback: + logger.debug("WinBLE : Waiting for services to populate") + max_retries = 50 + for attempt in range(max_retries): + if self._client.services.services: + logger.debug(f"WinBLE : Services populated on attempt {attempt + 1}: {list(self._client.services.services.keys())}") + break + await asyncio.sleep(0.1) + else: + logger.error(f"WinBLE : Service discovery failed. Services still empty after {max_retries} attempts") + raise SMPBLETransportNotSMPServer("WinBLE : Service discovery failed - no services found.") + smp_characteristic = self._client.services.get_characteristic(SMP_CHARACTERISTIC_UUID) if smp_characteristic is None: raise SMPBLETransportNotSMPServer("Missing the SMP characteristic UUID.") diff --git a/tests/test_smp_ble_transport.py b/tests/test_smp_ble_transport.py index c451876..6e191a0 100644 --- a/tests/test_smp_ble_transport.py +++ b/tests/test_smp_ble_transport.py @@ -1,6 +1,7 @@ """Tests for `SMPBLETransport`.""" import asyncio +import sys from typing import cast from unittest.mock import AsyncMock, MagicMock, patch from uuid import UUID @@ -99,10 +100,24 @@ async def test_connect( ) mock_find_device_by_address.reset_mock() - # assert that it raises an exception if the device is not found + # assert that it raises an exception if the device is not found (non-MAC or non-Windows) mock_find_device_by_address.return_value = None + mock_find_device_by_name.return_value = None with pytest.raises(SMPBLETransportDeviceNotFound): - await SMPBLETransport().connect("00:00:00:00:00:00", 1.0) + await SMPBLETransport().connect("device name", 1.0) + mock_find_device_by_name.reset_mock() + + # restore mock return values for next tests + mock_find_device_by_address.return_value = BLEDevice("address", "name", None) + mock_find_device_by_name.return_value = BLEDevice("address", "name", None) + + # assert that on Windows with MAC, the bonded-device fallback is attempted + with patch("sys.platform", "win32"): + mock_find_device_by_address.return_value = None + t = SMPBLETransport() + await t.connect("00:00:00:00:00:00", 1.0) + # Verify that BleakClient was called with the MAC address (fallback attempt) + assert t._client is not None mock_find_device_by_address.reset_mock() # assert that connect is awaited