diff --git a/CHANGELOG.md b/CHANGELOG.md index caf703d3..e33c3b61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 2.1.0 +* Add optional `queueId` parameter to all APIs * Android: add `legacy` to `AndroidOptions` — set `legacy: true` to scan legacy BLE 4.x advertisements (e.g. ESP32) on API 26+; default (`null`/`false`) keeps extended-advertisement scanning unchanged from prior releases ## 2.0.4 diff --git a/README.md b/README.md index 096d5ca9..8d59a148 100644 --- a/README.md +++ b/README.md @@ -557,6 +557,13 @@ If you want to parallelize commands between multiple devices, you can set: UniversalBle.queueType = QueueType.perDevice; ``` +You can have separate queues by passing an optional `queueId`. Commands with the same `queueId` are serialized together, but run in parallel with both `QueueType.perDevice` and `QueueType.global`: + +```dart +UniversalBle.write(deviceId, service, char, value1, queueId: '1'); +UniversalBle.write(deviceId, service, char, value2, queueId: '2'); +``` + You can also completely disable the queue and batch all commands, even for the same device, by using: ```dart @@ -575,13 +582,20 @@ UniversalBle.onQueueUpdate = (String id, int remainingItems) { }; ``` -To clear the queue: +To clear a queue: ```dart - /// Use [BleCommandQueue.globalQueueId] to clear the global queue. - /// To clear the queue of a specific device, use `deviceId` as [id]. - /// If no [id] is provided, all queues will be cleared. - UniversalBle.clearQueue(BleCommandQueue.globalQueueId); +// Clear global queue +UniversalBle.clearQueue(BleCommandQueue.globalQueueId); + +// Clear a per-device queue (when queueType is perDevice) +UniversalBle.clearQueue(deviceId); + +// Clear a custom queue (same string passed as queueId to read/write/etc.) +UniversalBle.clearQueue('customQueueId'); + +// Clear all queues +UniversalBle.clearQueue(); ``` ## Timeout @@ -1025,6 +1039,7 @@ To opt in, declare the `Uses Bluetooth LE accessories` background mode. After en ... ``` + Notes: - Without the `bluetooth-central` background mode, `CBCentralManager` is created lazily on the first central BLE API call and state restoration is disabled. diff --git a/example/macos/Podfile.lock b/example/macos/Podfile.lock index 66748eaa..7eeecf30 100644 --- a/example/macos/Podfile.lock +++ b/example/macos/Podfile.lock @@ -16,7 +16,7 @@ EXTERNAL SOURCES: SPEC CHECKSUMS: FlutterMacOS: d0db08ddef1a9af05a5ec4b724367152bb0500b1 - universal_ble: a322ebabee64f0ec27a313e89a5dd6967f37a60f + universal_ble: d0c3d7347d16c82746288e13ed2f1368ecf8e716 PODFILE CHECKSUM: 54d867c82ac51cbd61b565781b9fada492027009 diff --git a/lib/src/extensions/ble_characteristic_extension.dart b/lib/src/extensions/ble_characteristic_extension.dart index 6107abb2..39a22a9a 100644 --- a/lib/src/extensions/ble_characteristic_extension.dart +++ b/lib/src/extensions/ble_characteristic_extension.dart @@ -21,12 +21,24 @@ extension BleCharacteristicExtension on BleCharacteristic { CharacteristicSubscription(this, CharacteristicProperty.indicate); /// Unsubscribes notifications/indications from this characteristic. - Future unsubscribe({Duration? timeout}) => - UniversalBle.unsubscribe(_deviceId, _serviceId, uuid, timeout: timeout); + Future unsubscribe({Duration? timeout, String? queueId}) => + UniversalBle.unsubscribe( + _deviceId, + _serviceId, + uuid, + timeout: timeout, + queueId: queueId, + ); /// Reads the current value of the characteristic. - Future read({Duration? timeout}) => - UniversalBle.read(_deviceId, _serviceId, uuid, timeout: timeout); + Future read({Duration? timeout, String? queueId}) => + UniversalBle.read( + _deviceId, + _serviceId, + uuid, + timeout: timeout, + queueId: queueId, + ); /// Writes a value to the characteristic. /// @@ -38,6 +50,7 @@ extension BleCharacteristicExtension on BleCharacteristic { List value, { bool withResponse = true, Duration? timeout, + String? queueId, }) async { await UniversalBle.write( _deviceId, @@ -46,6 +59,7 @@ extension BleCharacteristicExtension on BleCharacteristic { Uint8List.fromList(value), withoutResponse: !withResponse, timeout: timeout, + queueId: queueId, ); } @@ -85,7 +99,7 @@ class CharacteristicSubscription { final bool isSupported; CharacteristicSubscription(this._characteristic, this._property) - : isSupported = _characteristic.properties.contains(_property); + : isSupported = _characteristic.properties.contains(_property); /// Registers a listener for incoming data from the characteristic. StreamSubscription listen( @@ -103,7 +117,7 @@ class CharacteristicSubscription { } /// Subscribes to this characteristic. - Future subscribe({Duration? timeout}) { + Future subscribe({Duration? timeout, String? queueId}) { if (!isSupported) throw Exception('Operation not supported'); if (_property == CharacteristicProperty.indicate) { @@ -112,6 +126,7 @@ class CharacteristicSubscription { _characteristic._serviceId, _characteristic.uuid, timeout: timeout, + queueId: queueId, ); } @@ -120,17 +135,19 @@ class CharacteristicSubscription { _characteristic._serviceId, _characteristic.uuid, timeout: timeout, + queueId: queueId, ); } /// Unsubscribes from this characteristic. - Future unsubscribe({Duration? timeout}) { + Future unsubscribe({Duration? timeout, String? queueId}) { if (!isSupported) throw Exception('Operation not supported'); return UniversalBle.unsubscribe( _characteristic._deviceId, _characteristic._serviceId, _characteristic.uuid, timeout: timeout, + queueId: queueId, ); } diff --git a/lib/src/extensions/ble_device_extension.dart b/lib/src/extensions/ble_device_extension.dart index 31e862ab..11ade447 100644 --- a/lib/src/extensions/ble_device_extension.dart +++ b/lib/src/extensions/ble_device_extension.dart @@ -25,7 +25,8 @@ extension BleDeviceExtension on BleDevice { ); /// Disconnects from the device. - Future disconnect() => UniversalBle.disconnect(deviceId); + Future disconnect({Duration? timeout, String? queueId}) => + UniversalBle.disconnect(deviceId, timeout: timeout, queueId: queueId); /// Requests an MTU (Maximum Transmission Unit) value for the connection. /// @@ -34,8 +35,8 @@ extension BleDeviceExtension on BleDevice { /// which may differ from `expectedMtu`. /// /// See [UniversalBle.requestMtu] for platform limitations and best practices. - Future requestMtu(int expectedMtu) => - UniversalBle.requestMtu(deviceId, expectedMtu); + Future requestMtu(int expectedMtu, {String? queueId}) => + UniversalBle.requestMtu(deviceId, expectedMtu, queueId: queueId); /// Check if a device is paired. /// @@ -43,11 +44,16 @@ extension BleDeviceExtension on BleDevice { /// Returns true/false if it manages to execute the command. /// Returns null when no `pairingCommand` is passed. /// Note that it will trigger pairing if the device is not already paired. - Future isPaired({BleCommand? pairingCommand, Duration? timeout}) { + Future isPaired({ + BleCommand? pairingCommand, + Duration? timeout, + String? queueId, + }) { return UniversalBle.isPaired( deviceId, pairingCommand: pairingCommand, timeout: timeout, + queueId: queueId, ); } @@ -62,19 +68,24 @@ extension BleDeviceExtension on BleDevice { /// /// On `Web/Windows` and `Web/Linux`, it does not work for devices that use `ConfirmOnly` pairing. /// Can throw `PairingException`, `ConnectionException` or `PlatformException`. - Future pair({BleCommand? pairingCommand, Duration? timeout}) { + Future pair({ + BleCommand? pairingCommand, + Duration? timeout, + String? queueId, + }) { return UniversalBle.pair( deviceId, pairingCommand: pairingCommand, timeout: timeout, + queueId: queueId, ); } /// Unpair a device. /// /// It might throw an error if device is not paired. - Future unpair({Duration? timeout}) => - UniversalBle.unpair(deviceId, timeout: timeout); + Future unpair({Duration? timeout, String? queueId}) => + UniversalBle.unpair(deviceId, timeout: timeout, queueId: queueId); /// Discovers the services offered by the device. /// @@ -82,11 +93,13 @@ extension BleDeviceExtension on BleDevice { Future> discoverServices({ Duration? timeout, bool withDescriptors = false, + String? queueId, }) async { List servicesCache = await UniversalBle.discoverServices( deviceId, withDescriptors: withDescriptors, timeout: timeout, + queueId: queueId, ); CacheHandler.instance.saveServices(deviceId, servicesCache); return servicesCache; @@ -101,13 +114,17 @@ extension BleDeviceExtension on BleDevice { String service, { bool preferCached = true, Duration? timeout, + String? queueId, }) async { List discoveredServices = []; if (preferCached) { discoveredServices = CacheHandler.instance.getServices(deviceId) ?? []; } if (discoveredServices.isEmpty) { - discoveredServices = await discoverServices(timeout: timeout); + discoveredServices = await discoverServices( + timeout: timeout, + queueId: queueId, + ); } if (discoveredServices.isEmpty) { @@ -137,11 +154,13 @@ extension BleDeviceExtension on BleDevice { required String service, bool preferCached = true, Duration? timeout, + String? queueId, }) async { BleService bluetoothService = await getService( service, preferCached: preferCached, timeout: timeout, + queueId: queueId, ); return bluetoothService.getCharacteristic(characteristic); } diff --git a/lib/src/universal_ble.dart b/lib/src/universal_ble.dart index bca2f84a..6a22c224 100644 --- a/lib/src/universal_ble.dart +++ b/lib/src/universal_ble.dart @@ -66,9 +66,12 @@ class UniversalBle { /// Get Bluetooth availability state. /// To be notified of updates, set [onAvailabilityChange] listener. - static Future getBluetoothAvailabilityState() async { + static Future getBluetoothAvailabilityState({ + String? queueId, + }) async { return await _bleCommandQueue.queueCommand( () => _platform.getBluetoothAvailabilityState(), + queueId: queueId, ); } @@ -105,28 +108,34 @@ class UniversalBle { static Future startScan({ ScanFilter? scanFilter, PlatformConfig? platformConfig, + String? queueId, }) async { return await _bleCommandQueue.queueCommandWithoutTimeout( () => _platform.startScan( scanFilter: scanFilter, platformConfig: platformConfig, ), + queueId: queueId, ); } /// Stop scan. /// Set [onScanResult] listener to `null` if you don't need it anymore. /// It might throw errors if Bluetooth is not available. - static Future stopScan() async { + static Future stopScan({String? queueId}) async { return await _bleCommandQueue.queueCommandWithoutTimeout( () => _platform.stopScan(), + queueId: queueId, ); } /// Check if currently scanning for devices. /// Returns `true` if scanning is active, `false` otherwise. - static Future isScanning() async { - return await _bleCommandQueue.queueCommand(() => _platform.isScanning()); + static Future isScanning({String? queueId}) async { + return await _bleCommandQueue.queueCommand( + () => _platform.isScanning(), + queueId: queueId, + ); } /// Connect to a device. @@ -166,7 +175,11 @@ class UniversalBle { /// Disconnect from a device. /// Get notified of connection state changes in [onConnectionChange] listener. - static Future disconnect(String deviceId, {Duration? timeout}) async { + static Future disconnect( + String deviceId, { + Duration? timeout, + String? queueId, + }) async { timeout ??= const Duration(seconds: 60); BleConnectionState? connectionState; try { @@ -186,6 +199,7 @@ class UniversalBle { () => _platform.disconnect(deviceId), timeout: timeout, deviceId: deviceId, + queueId: queueId, ) .catchError((error) { if (completer.isCompleted) return; @@ -219,11 +233,13 @@ class UniversalBle { String deviceId, { bool withDescriptors = false, Duration? timeout, + String? queueId, }) async { return await _bleCommandQueue.queueCommand( () => _platform.discoverServices(deviceId, withDescriptors), timeout: timeout, deviceId: deviceId, + queueId: queueId, ); } @@ -235,6 +251,7 @@ class UniversalBle { String service, String characteristic, { Duration? timeout, + String? queueId, }) async { return _sendBleInputPropertyCommand( deviceId, @@ -242,6 +259,7 @@ class UniversalBle { characteristic, BleInputProperty.notification, timeout: timeout, + queueId: queueId, ); } @@ -253,6 +271,7 @@ class UniversalBle { String service, String characteristic, { Duration? timeout, + String? queueId, }) async { return _sendBleInputPropertyCommand( deviceId, @@ -260,6 +279,7 @@ class UniversalBle { characteristic, BleInputProperty.indication, timeout: timeout, + queueId: queueId, ); } @@ -269,6 +289,7 @@ class UniversalBle { String service, String characteristic, { Duration? timeout, + String? queueId, }) async { return _sendBleInputPropertyCommand( deviceId, @@ -276,6 +297,7 @@ class UniversalBle { characteristic, BleInputProperty.disabled, timeout: timeout, + queueId: queueId, ); } @@ -286,6 +308,7 @@ class UniversalBle { String service, String characteristic, { Duration? timeout, + String? queueId, }) async { return await _bleCommandQueue.queueCommand( () => _platform.readValue( @@ -296,6 +319,7 @@ class UniversalBle { ), timeout: timeout, deviceId: deviceId, + queueId: queueId, ); } @@ -308,6 +332,7 @@ class UniversalBle { Uint8List value, { bool withoutResponse = false, Duration? timeout, + String? queueId, }) async { await _bleCommandQueue.queueCommand( () => _platform.writeValue( @@ -321,6 +346,7 @@ class UniversalBle { ), timeout: timeout, deviceId: deviceId, + queueId: queueId, ); } @@ -343,11 +369,13 @@ class UniversalBle { String deviceId, int expectedMtu, { Duration? timeout, + String? queueId, }) async { return await _bleCommandQueue.queueCommand( () => _platform.requestMtu(deviceId, expectedMtu), timeout: timeout, deviceId: deviceId, + queueId: queueId, ); } @@ -367,11 +395,13 @@ class UniversalBle { String deviceId, BleConnectionPriority priority, { Duration? timeout, + String? queueId, }) async { return await _bleCommandQueue.queueCommand( () => _platform.requestConnectionPriority(deviceId, priority), timeout: timeout, deviceId: deviceId, + queueId: queueId, ); } @@ -386,11 +416,16 @@ class UniversalBle { /// Throws [UniversalBleException] if: /// - The device is not connected /// - Reading RSSI fails - static Future readRssi(String deviceId, {Duration? timeout}) async { + static Future readRssi( + String deviceId, { + Duration? timeout, + String? queueId, + }) async { return await _bleCommandQueue.queueCommand( () => _platform.readRssi(deviceId), timeout: timeout, deviceId: deviceId, + queueId: queueId, ); } @@ -404,12 +439,14 @@ class UniversalBle { String deviceId, { BleCommand? pairingCommand, Duration? timeout, + String? queueId, }) async { if (BleCapabilities.hasSystemPairingApi) { return _bleCommandQueue.queueCommand( () => _platform.isPaired(deviceId), deviceId: deviceId, timeout: timeout, + queueId: queueId, ); } @@ -424,6 +461,7 @@ class UniversalBle { pairingCommand, updateCallbackValue: false, timeout: timeout, + queueId: queueId, ); // Because pairingCommand will be never null, so we wont get Unknown result here @@ -449,12 +487,14 @@ class UniversalBle { String deviceId, { BleCommand? pairingCommand, Duration? timeout, + String? queueId, }) async { if (BleCapabilities.hasSystemPairingApi) { bool paired = await _bleCommandQueue.queueCommand( () => _platform.pair(deviceId), deviceId: deviceId, timeout: timeout, + queueId: queueId, ); if (!paired) throw PairingException(); } else { @@ -465,17 +505,23 @@ class UniversalBle { deviceId, pairingCommand, timeout: timeout, + queueId: queueId, ); } } /// Unpair a device. /// It might throw an error if device is not paired. - static Future unpair(String deviceId, {Duration? timeout}) async { + static Future unpair( + String deviceId, { + Duration? timeout, + String? queueId, + }) async { return await _bleCommandQueue.queueCommand( () => _platform.unpair(deviceId), deviceId: deviceId, timeout: timeout, + queueId: queueId, ); } @@ -487,10 +533,12 @@ class UniversalBle { static Future> getSystemDevices({ List? withServices, Duration? timeout, + String? queueId, }) async { return await _bleCommandQueue.queueCommand( () => _platform.getSystemDevices(withServices?.toValidUUIDList()), timeout: timeout, + queueId: queueId, ); } @@ -500,36 +548,47 @@ class UniversalBle { static Future getConnectionState( String deviceId, { Duration? timeout, + String? queueId, }) async { return await _bleCommandQueue.queueCommand( () => _platform.getConnectionState(deviceId), timeout: timeout, + queueId: queueId, ); } /// Enable Bluetooth. /// It might throw errors if Bluetooth is not available. /// Not supported on `Web` and `Apple`. - static Future enableBluetooth({Duration? timeout}) async { + static Future enableBluetooth({ + Duration? timeout, + String? queueId, + }) async { return await _bleCommandQueue.queueCommand( () => _platform.enableBluetooth(), timeout: timeout, + queueId: queueId, ); } /// Disable Bluetooth. /// It might throw errors if Bluetooth is not available. /// Not supported on `Web` and `Apple`. - static Future disableBluetooth({Duration? timeout}) async { + static Future disableBluetooth({ + Duration? timeout, + String? queueId, + }) async { return await _bleCommandQueue.queueCommand( () => _platform.disableBluetooth(), timeout: timeout, + queueId: queueId, ); } /// Clear a queue. /// Use [BleCommandQueue.globalQueueId] to clear the global queue. /// To clear the queue of a specific device, use `deviceId` as [id]. + /// To clear a custom queue, pass the same `queueId` string used when enqueueing commands. /// If no [id] is provided, all queues will be cleared. static void clearQueue([String? id]) => _bleCommandQueue.clearQueue(id); @@ -562,13 +621,15 @@ class UniversalBle { String deviceId, String service, String characteristic, - BleInputProperty bleInputProperty, - ) async { + BleInputProperty bleInputProperty, { + String? queueId, + }) async { return _sendBleInputPropertyCommand( deviceId, service, characteristic, bleInputProperty, + queueId: queueId, ); } @@ -655,6 +716,7 @@ class UniversalBle { String characteristic, BleInputProperty bleInputProperty, { Duration? timeout, + String? queueId, }) async { return await _bleCommandQueue.queueCommand( () => _platform.setNotifiable( @@ -665,6 +727,7 @@ class UniversalBle { ), deviceId: deviceId, timeout: timeout, + queueId: queueId, ); } @@ -673,8 +736,13 @@ class UniversalBle { BleCommand? bleCommand, { bool updateCallbackValue = false, Duration? timeout, + String? queueId, }) async { - var connectionState = await getConnectionState(deviceId, timeout: timeout); + var connectionState = await getConnectionState( + deviceId, + timeout: timeout, + queueId: queueId, + ); // Try to connect first if (connectionState != BleConnectionState.connected) { UniversalLogger.logInfo("Connecting to $deviceId"); @@ -684,16 +752,28 @@ class UniversalBle { List services = await discoverServices( deviceId, timeout: timeout, + queueId: queueId, ); UniversalLogger.logInfo("Discovered services: ${services.length}"); if (bleCommand == null) { // Just attempt pairing - await _attemptPairingReadingAll(deviceId, services, timeout: timeout); + await _attemptPairingReadingAll( + deviceId, + services, + timeout: timeout, + queueId: queueId, + ); return; } - await _executeBleCommand(deviceId, services, bleCommand, timeout: timeout); + await _executeBleCommand( + deviceId, + services, + bleCommand, + timeout: timeout, + queueId: queueId, + ); if (updateCallbackValue) _platform.updatePairingState(deviceId, true); } @@ -702,6 +782,7 @@ class UniversalBle { String deviceId, List services, { Duration? timeout, + String? queueId, }) async { bool containsReadCharacteristics = false; try { @@ -715,6 +796,7 @@ class UniversalBle { service.uuid, characteristic.uuid, timeout: timeout ?? const Duration(seconds: 30), + queueId: queueId, ); } } @@ -731,6 +813,7 @@ class UniversalBle { List services, BleCommand bleCommand, { Duration? timeout, + String? queueId, }) async { // First find BleCommand's characteristic BleCharacteristic? characteristic; @@ -779,6 +862,7 @@ class UniversalBle { value, withoutResponse: withoutResponse, timeout: timeout, + queueId: queueId, ); } else { // Fallback to read if supported @@ -787,6 +871,7 @@ class UniversalBle { bleCommand.service, bleCommand.characteristic, timeout: timeout ?? const Duration(seconds: 30), + queueId: queueId, ); } } catch (e) { diff --git a/lib/src/utils/ble_command_queue.dart b/lib/src/utils/ble_command_queue.dart index b9bd767e..299d620f 100644 --- a/lib/src/utils/ble_command_queue.dart +++ b/lib/src/utils/ble_command_queue.dart @@ -15,14 +15,21 @@ class BleCommandQueue { Future Function() command, { String? deviceId, Duration? timeout, + String? queueId, }) { Duration? timeoutDuration = timeout ?? this.timeout; if (timeoutDuration == null) { - return queueCommandWithoutTimeout(command, deviceId: deviceId); + return queueCommandWithoutTimeout( + command, + deviceId: deviceId, + queueId: queueId, + ); } return switch (queueType) { - QueueType.global => _queue().add(command, timeoutDuration), - QueueType.perDevice => _queue(deviceId).add(command, timeoutDuration), + QueueType.global => _queue(queueId).add(command, timeoutDuration), + QueueType.perDevice => _queue( + queueId ?? deviceId, + ).add(command, timeoutDuration), QueueType.none => command().timeout(timeoutDuration), }; } @@ -30,16 +37,19 @@ class BleCommandQueue { Future queueCommandWithoutTimeout( Future Function() command, { String? deviceId, + String? queueId, }) { return switch (queueType) { - QueueType.global => _queue().add(command), - QueueType.perDevice => _queue(deviceId).add(command), + QueueType.global => _queue(queueId).add(command), + QueueType.perDevice => _queue(queueId ?? deviceId).add(command), QueueType.none => command(), }; } - Queue _queue([String? id = globalQueueId]) => - _queueMap[id] ?? _newQueue(id ?? globalQueueId); + Queue _queue(String? id) { + final queueKey = id ?? globalQueueId; + return _queueMap[queueKey] ?? _newQueue(queueKey); + } Queue _newQueue(String id) { final queue = Queue(); diff --git a/test/ble_command_queue_test.dart b/test/ble_command_queue_test.dart new file mode 100644 index 00000000..d6408b8b --- /dev/null +++ b/test/ble_command_queue_test.dart @@ -0,0 +1,352 @@ +import 'dart:async'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:universal_ble/src/utils/ble_command_queue.dart'; +import 'package:universal_ble/universal_ble.dart'; + +void main() { + group('BleCommandQueue', () { + test('global queue executes commands sequentially', () async { + final commandQueue = BleCommandQueue(); + final order = []; + + final firstStarted = Completer(); + final releaseFirst = Completer(); + final secondStarted = Completer(); + + final first = commandQueue.queueCommand(() async { + firstStarted.complete(); + await releaseFirst.future; + order.add(1); + }); + final second = commandQueue.queueCommand(() async { + secondStarted.complete(); + order.add(2); + }); + + await firstStarted.future; + expect(order, isEmpty); + + releaseFirst.complete(); + await first; + await secondStarted.future; + await second; + + expect(order, [1, 2]); + }); + + test('null queueId uses the global queue', () async { + final commandQueue = BleCommandQueue(); + final order = []; + + final firstStarted = Completer(); + final release = Completer(); + final secondStarted = Completer(); + + final first = commandQueue.queueCommand( + () async { + firstStarted.complete(); + await release.future; + order.add(1); + }, + queueId: null, + ); + final second = commandQueue.queueCommand( + () async { + secondStarted.complete(); + order.add(2); + }, + queueId: null, + ); + + await firstStarted.future; + expect(order, isEmpty); + + release.complete(); + await first; + await secondStarted.future; + await second; + + expect(order, [1, 2]); + }); + + test('custom queueId creates an independent queue in global mode', () async { + final commandQueue = BleCommandQueue(); + final order = []; + + final releaseDefault = Completer(); + final releaseCustom = Completer(); + + commandQueue.queueCommand( + () async { + await releaseDefault.future; + order.add('default'); + }, + queueId: null, + ); + commandQueue.queueCommand( + () async { + await releaseCustom.future; + order.add('custom'); + }, + queueId: 'tilta', + ); + + await pumpEventQueue(); + expect(order, isEmpty); + + releaseCustom.complete(); + await pumpEventQueue(); + + expect(order, ['custom']); + + releaseDefault.complete(); + await pumpEventQueue(); + + expect(order, ['custom', 'default']); + }); + + test('perDevice queue isolates commands by device', () async { + final commandQueue = BleCommandQueue(queueType: QueueType.perDevice); + final order = []; + + final releaseA = Completer(); + final releaseB = Completer(); + final deviceBStarted = Completer(); + + commandQueue.queueCommand( + () async { + await releaseA.future; + order.add('device-a'); + }, + deviceId: 'device-a', + ); + commandQueue.queueCommand( + () async { + deviceBStarted.complete(); + await releaseB.future; + order.add('device-b'); + }, + deviceId: 'device-b', + ); + + await deviceBStarted.future; + expect(order, isEmpty); + + releaseB.complete(); + await pumpEventQueue(); + expect(order, ['device-b']); + + releaseA.complete(); + await pumpEventQueue(); + expect(order, ['device-b', 'device-a']); + }); + + test('perDevice queueId overrides device routing', () async { + final commandQueue = BleCommandQueue(queueType: QueueType.perDevice); + final order = []; + + final releaseShared = Completer(); + final secondStarted = Completer(); + + commandQueue.queueCommand( + () async { + await releaseShared.future; + order.add('first'); + }, + deviceId: 'device-a', + queueId: 'shared', + ); + commandQueue.queueCommand( + () async { + secondStarted.complete(); + order.add('second'); + }, + deviceId: 'device-b', + queueId: 'shared', + ); + + await pumpEventQueue(); + expect(order, isEmpty); + + releaseShared.complete(); + await secondStarted.future; + await pumpEventQueue(); + + expect(order, ['first', 'second']); + }); + + test('QueueType.none runs commands without queueing', () async { + final commandQueue = BleCommandQueue(queueType: QueueType.none); + final order = []; + + final firstStarted = Completer(); + final secondStarted = Completer(); + final releaseFirst = Completer(); + + commandQueue.queueCommand(() async { + firstStarted.complete(); + await releaseFirst.future; + order.add(1); + }); + commandQueue.queueCommand(() async { + secondStarted.complete(); + order.add(2); + }); + + await firstStarted.future; + await secondStarted.future; + + expect(order, [2]); + + releaseFirst.complete(); + await pumpEventQueue(); + + expect(order, [2, 1]); + }); + + test('queueCommandWithoutTimeout bypasses global timeout', () async { + final commandQueue = BleCommandQueue() + ..timeout = const Duration(milliseconds: 10); + + await expectLater( + commandQueue.queueCommand( + () => Future.delayed(const Duration(milliseconds: 100)), + ), + throwsA(isA()), + ); + + await expectLater( + commandQueue.queueCommandWithoutTimeout( + () => Future.delayed(const Duration(milliseconds: 30)), + ), + completes, + ); + }); + + test('onQueueUpdate reports remaining items per queue id', () async { + final commandQueue = BleCommandQueue(); + final updates = >{}; + + commandQueue.onQueueUpdate = (id, remaining) { + updates.putIfAbsent(id, () => []).add(remaining); + }; + + final release = Completer(); + final started = Completer(); + + final first = commandQueue.queueCommand( + () async { + started.complete(); + await release.future; + }, + queueId: 'tilta', + ); + commandQueue.queueCommand(() async {}, queueId: 'tilta'); + commandQueue.queueCommand(() async {}, queueId: 'tilta'); + + await started.future; + expect(updates['tilta'], contains(3)); + + release.complete(); + await first; + await pumpEventQueue(); + + expect(updates['tilta']!.last, 0); + }); + + test('clearQueue cancels pending commands for a specific queue id', () async { + final commandQueue = BleCommandQueue(); + final order = []; + + final release = Completer(); + final started = Completer(); + + commandQueue.queueCommand( + () async { + started.complete(); + await release.future; + order.add('in-flight'); + }, + queueId: 'tilta', + ); + final pending = commandQueue.queueCommand( + () async { + order.add('pending'); + return 'pending'; + }, + queueId: 'tilta', + ); + + await started.future; + commandQueue.clearQueue('tilta'); + + await expectLater( + pending, + throwsA( + isA().having( + (e) => e.toString(), + 'message', + contains('Queue Cancelled'), + ), + ), + ); + + release.complete(); + await pumpEventQueue(); + + expect(order, ['in-flight']); + }); + + test('clearQueue without id clears all queues', () async { + final commandQueue = BleCommandQueue(); + final releaseDefault = Completer(); + final releaseCustom = Completer(); + final defaultStarted = Completer(); + final customStarted = Completer(); + + commandQueue.queueCommand( + () async { + defaultStarted.complete(); + await releaseDefault.future; + }, + queueId: null, + ); + commandQueue.queueCommand( + () async { + customStarted.complete(); + await releaseCustom.future; + }, + queueId: 'tilta', + ); + + final pendingDefault = commandQueue.queueCommand( + () async {}, + queueId: null, + ); + final pendingCustom = commandQueue.queueCommand( + () async {}, + queueId: 'tilta', + ); + + await defaultStarted.future; + await customStarted.future; + + commandQueue.clearQueue(null); + + await expectLater(pendingDefault, throwsA(isA())); + await expectLater(pendingCustom, throwsA(isA())); + + releaseDefault.complete(); + releaseCustom.complete(); + }); + + test('new commands recreate a cleared queue id', () async { + final commandQueue = BleCommandQueue(); + + commandQueue.clearQueue(BleCommandQueue.globalQueueId); + + expect(await commandQueue.queueCommand(() async => 7), 7); + }); + }); +} diff --git a/test/queue_test.dart b/test/queue_test.dart new file mode 100644 index 00000000..264c73dd --- /dev/null +++ b/test/queue_test.dart @@ -0,0 +1,163 @@ +import 'dart:async'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:universal_ble/src/queue.dart'; + +void main() { + group('Queue', () { + test('executes commands sequentially in order', () async { + final queue = Queue(); + final order = []; + + final firstStarted = Completer(); + final releaseFirst = Completer(); + final secondStarted = Completer(); + + final future1 = queue.add(() async { + firstStarted.complete(); + await releaseFirst.future; + order.add(1); + return 1; + }); + final future2 = queue.add(() async { + secondStarted.complete(); + order.add(2); + return 2; + }); + + await firstStarted.future; + expect(order, isEmpty); + + releaseFirst.complete(); + await future1; + await secondStarted.future; + await future2; + + expect(order, [1, 2]); + }); + + test('completes void commands with null', () async { + final queue = Queue(); + + await queue.add(() async {}); + + expect(await queue.add(() async => null), isNull); + }); + + test('returns command results', () async { + final queue = Queue(); + + expect(await queue.add(() async => 42), 42); + expect(await queue.add(() async => 'ok'), 'ok'); + }); + + test('propagates command errors', () async { + final queue = Queue(); + + final future = queue.add(() async { + throw StateError('failed'); + }); + + await expectLater(future, throwsA(isA())); + }); + + test('times out slow commands', () async { + final queue = Queue(); + + final future = queue.add( + () => Future.delayed(const Duration(seconds: 5)), + const Duration(milliseconds: 50), + ); + + await expectLater(future, throwsA(isA())); + }); + + test('reports remaining items via onRemainingItemsUpdate', () async { + final queue = Queue(); + final remaining = []; + + queue.onRemainingItemsUpdate = remaining.add; + + final release = Completer(); + final started = Completer(); + + final first = queue.add(() async { + started.complete(); + await release.future; + }); + queue.add(() async {}); + queue.add(() async {}); + + await started.future; + expect(remaining, contains(3)); + + release.complete(); + await first; + await pumpEventQueue(); + + expect(remaining.last, 0); + }); + + test('dispose completes pending commands with error', () async { + final queue = Queue(); + final started = Completer(); + + queue.add(() async { + started.complete(); + await Future.delayed(const Duration(seconds: 5)); + }); + + final pending = queue.add(() async => 'pending'); + + await started.future; + queue.dispose(); + + await expectLater( + pending, + throwsA( + isA().having( + (e) => e.toString(), + 'message', + contains('Queue Cancelled'), + ), + ), + ); + }); + + test('dispose prevents adding new commands', () { + final queue = Queue()..dispose(); + + expect( + () => queue.add(() async {}), + throwsA( + isA().having( + (e) => e.toString(), + 'message', + contains('Queue Cancelled'), + ), + ), + ); + }); + + test('in-flight command completes after dispose', () async { + final queue = Queue(); + final release = Completer(); + final started = Completer(); + + final inFlight = queue.add(() async { + started.complete(); + await release.future; + return 'done'; + }); + + final pending = queue.add(() async => 'pending'); + unawaited(pending.catchError((_) => 'ignored')); + + await started.future; + queue.dispose(); + + release.complete(); + expect(await inFlight, 'done'); + }); + }); +}